activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1171874 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x: ./ src/main/csharp/Transport/ src/test/csharp/ src/test/csharp/OpenWire/
Date Sat, 17 Sep 2011 00:23:26 GMT
Author: jgomes
Date: Sat Sep 17 00:23:26 2011
New Revision: 1171874

URL: http://svn.apache.org/viewvc?rev=1171874&view=rev
Log:
Fix with unit tests for thread leak.
Fixes [AMQNET-343]. (See https://issues.apache.org/jira/browse/AMQNET-343)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nmsprovider-test.config
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/NetTxConnectionFactoryTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nmsprovider-test.config
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nmsprovider-test.config?rev=1171874&r1=1171873&r2=1171874&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nmsprovider-test.config (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nmsprovider-test.config Sat
Sep 17 00:23:26 2011
@@ -21,8 +21,6 @@
 		<passWord value="manager"/>
 	</defaultURI>
 
-	<maxInactivityDurationURI value="activemq:tcp://${activemqhost}:61616?wireFormat.MaxInactivityDurationInitialDelay=5000&amp;wireFormat.MaxInactivityDuration=10000&amp;connection.AsyncClose=false"/>
-
 	<openWireURI value="activemq:tcp://${activemqhost}:61616?connection.AsyncClose=false">
 		<factoryParams>
 			<param type="string" value="OpenWireTestClient"/>

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs?rev=1171874&r1=1171873&r2=1171874&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs
Sat Sep 17 00:23:26 2011
@@ -23,444 +23,451 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-    /// <summary>
-    /// This class make sure that the connection is still alive,
-    /// by monitoring the reception of commands from the peer of
-    /// the transport.
-    /// </summary>
-    public class InactivityMonitor : TransportFilter
-    {
-        private readonly Atomic<bool> monitorStarted = new Atomic<bool>(false);
-
-        private readonly Atomic<bool> commandSent = new Atomic<bool>(false);
-        private readonly Atomic<bool> commandReceived = new Atomic<bool>(false);
-
-        private readonly Atomic<bool> failed = new Atomic<bool>(false);
-        private readonly Atomic<bool> inRead = new Atomic<bool>(false);
-        private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
-
-        private CompositeTaskRunner asyncTasks;
-        private AsyncSignalReadErrorkTask asyncErrorTask;
-        private AsyncWriteTask asyncWriteTask;
-
-        private readonly Mutex monitor = new Mutex();
-
-        private Timer connectionCheckTimer;
-
-        private DateTime lastReadCheckTime;
-
-        private static int id = 0;
-        private readonly int instanceId = 0;
-        private bool disposing = false;
-
-        private long readCheckTime;
-        public long ReadCheckTime
-        {
-            get { return this.readCheckTime; }
-            set { this.readCheckTime = value; }
-        }
-
-        private long writeCheckTime;
-        public long WriteCheckTime
-        {
-            get { return this.writeCheckTime; }
-            set { this.writeCheckTime = value; }
-        }
-
-        private long initialDelayTime;
-        public long InitialDelayTime
-        {
-            get { return this.initialDelayTime; }
-            set { this.initialDelayTime = value; }
-        }
-
-        private readonly Atomic<bool> keepAliveResponseRequired = new Atomic<bool>(false);
-        public bool KeepAliveResponseRequired
-        {
-            get { return this.keepAliveResponseRequired.Value; }
-            set { keepAliveResponseRequired.Value = value; }
-        }
-
-        // Local and remote Wire Format Information
-        private WireFormatInfo localWireFormatInfo;
-        private WireFormatInfo remoteWireFormatInfo;
-
-        /// <summary>
-        /// Constructor or the Inactivity Monitor
-        /// </summary>
-        /// <param name="next"></param>
-        public InactivityMonitor(ITransport next)
-            : base(next)
-        {
-            this.instanceId = ++id;
-            Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
-        }
-
-        ~InactivityMonitor()
-        {
-            Dispose(false);
-        }
-
-        protected override void Dispose(bool disposing)
-        {
-            if(disposing)
-            {
-                // get rid of unmanaged stuff
-            }
-
-            lock(monitor)
-            {
-                this.localWireFormatInfo = null;
-                this.remoteWireFormatInfo = null;
-                this.disposing = true;
-                StopMonitorThreads();
-            }
-
-            base.Dispose(disposing);
-        }
-		
+	/// <summary>
+	/// This class make sure that the connection is still alive,
+	/// by monitoring the reception of commands from the peer of
+	/// the transport.
+	/// </summary>
+	public class InactivityMonitor : TransportFilter
+	{
+		private readonly Atomic<bool> monitorStarted = new Atomic<bool>(false);
+
+		private readonly Atomic<bool> commandSent = new Atomic<bool>(false);
+		private readonly Atomic<bool> commandReceived = new Atomic<bool>(false);
+
+		private readonly Atomic<bool> failed = new Atomic<bool>(false);
+		private readonly Atomic<bool> inRead = new Atomic<bool>(false);
+		private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
+
+		private CompositeTaskRunner asyncTasks;
+		private AsyncSignalReadErrorkTask asyncErrorTask;
+		private AsyncWriteTask asyncWriteTask;
+
+		private readonly Mutex monitor = new Mutex();
+
+		private Timer connectionCheckTimer;
+
+		private DateTime lastReadCheckTime;
+
+		private static int id = 0;
+		private readonly int instanceId = 0;
+		private bool disposing = false;
+
+		private long readCheckTime;
+		public long ReadCheckTime
+		{
+			get { return this.readCheckTime; }
+			set { this.readCheckTime = value; }
+		}
+
+		private long writeCheckTime;
+		public long WriteCheckTime
+		{
+			get { return this.writeCheckTime; }
+			set { this.writeCheckTime = value; }
+		}
+
+		private long initialDelayTime;
+		public long InitialDelayTime
+		{
+			get { return this.initialDelayTime; }
+			set { this.initialDelayTime = value; }
+		}
+
+		private readonly Atomic<bool> keepAliveResponseRequired = new Atomic<bool>(false);
+		public bool KeepAliveResponseRequired
+		{
+			get { return this.keepAliveResponseRequired.Value; }
+			set { keepAliveResponseRequired.Value = value; }
+		}
+
+		// Local and remote Wire Format Information
+		private WireFormatInfo localWireFormatInfo;
+		private WireFormatInfo remoteWireFormatInfo;
+
+		/// <summary>
+		/// Constructor or the Inactivity Monitor
+		/// </summary>
+		/// <param name="next"></param>
+		public InactivityMonitor(ITransport next)
+			: base(next)
+		{
+			this.instanceId = ++id;
+			Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
+		}
+
+		~InactivityMonitor()
+		{
+			Dispose(false);
+		}
+
+		protected override void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				// get rid of unmanaged stuff
+			}
+
+			lock(monitor)
+			{
+				this.localWireFormatInfo = null;
+				this.remoteWireFormatInfo = null;
+				this.disposing = true;
+				StopMonitorThreads();
+			}
+
+			base.Dispose(disposing);
+		}
+
 		public void CheckConnection(object state)
 		{
 			// First see if we have written or can write.
 			WriteCheck();
-			
+
 			// Now check is we've read anything, if not then we send
 			// a new KeepAlive with response required.
 			ReadCheck();
 		}
 
-        #region WriteCheck Related
-        /// <summary>
-        /// Check the write to the broker
-        /// </summary>
-        public void WriteCheck()
-        {
-            if(this.inWrite.Value || this.failed.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.",
instanceId);
-                return;
-            }
-
-            if(!commandSent.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write
check. Sending a KeepAliveInfo.", instanceId);
-                this.asyncWriteTask.IsPending = true;
-                this.asyncTasks.Wakeup();
-            }
-            else
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write
check. Resetting flag.", instanceId);
-            }
-
-            commandSent.Value = false;
-        }
-        #endregion
-
-        #region ReadCheck Related
-        public void ReadCheck()
-        {
-            DateTime now = DateTime.Now;
-            TimeSpan elapsed = now - this.lastReadCheckTime;
-
-            if(!AllowReadCheck(elapsed))
-            {
-                Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently
allowed.");
-                return;
-            }
-
-            this.lastReadCheckTime = now;
-
-            if(this.inRead.Value || this.failed.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already
failed.", instanceId);
-                return;
-            }
-
-            if(!commandReceived.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last
read check! Sending an InactivityException!", instanceId);
-                this.asyncErrorTask.IsPending = true;
-                this.asyncTasks.Wakeup();
-            }
-            else
-            {
-                commandReceived.Value = false;
-            }
-        }
-
-        /// <summary>
-        /// Checks if we should allow the read check(if less than 90% of the read
-        /// check time elapsed then we dont do the readcheck
-        /// </summary>
-        /// <param name="elapsed"></param>
-        /// <returns></returns>
-        public bool AllowReadCheck(TimeSpan elapsed)
-        {
-            return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
-        }
-        #endregion
-
-        public override void Stop()
-        {
-            StopMonitorThreads();
-            next.Stop();
-        }
-
-        protected override void OnCommand(ITransport sender, Command command)
-        {
-            commandReceived.Value = true;
-            inRead.Value = true;
-            try
-            {
-                if(command.IsKeepAliveInfo)
-                {
-                    KeepAliveInfo info = command as KeepAliveInfo;
-                    if(info.ResponseRequired)
-                    {
-                        try
-                        {
-                            info.ResponseRequired = false;
-                            Oneway(info);
-                        }
-                        catch(IOException ex)
-                        {
-                            OnException(this, ex);
-                        }
-                    }
-                }
-                else if(command.IsWireFormatInfo)
-                {
-                    lock(monitor)
-                    {
-                        remoteWireFormatInfo = command as WireFormatInfo;
-                        try
-                        {
-                            StartMonitorThreads();
-                        }
-                        catch(IOException ex)
-                        {
-                            OnException(this, ex);
-                        }
-                    }
-                }
-                base.OnCommand(sender, command);
-            }
-            finally
-            {
-                inRead.Value = false;
-            }
-        }
-
-        public override void Oneway(Command command)
-        {
-            // Disable inactivity monitoring while processing a command.
-            //synchronize this method - its not synchronized
-            //further down the transport stack and gets called by more
-            //than one thread  by this class
-            lock(inWrite)
-            {
-                inWrite.Value = true;
-                try
-                {
-                    if(failed.Value)
-                    {
-                        throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
-                    }
-                    if(command.IsWireFormatInfo)
-                    {
-                        lock(monitor)
-                        {
-                            localWireFormatInfo = command as WireFormatInfo;
-                            StartMonitorThreads();
-                        }
-                    }
-                    next.Oneway(command);
-                }
-                finally
-                {
-                    commandSent.Value = true;
-                    inWrite.Value = false;
-                }
-            }
-        }
-
-        protected override void OnException(ITransport sender, Exception command)
-        {
-            if(failed.CompareAndSet(false, true) && !this.disposing)
-            {
-                Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
-                StopMonitorThreads();
-                base.OnException(sender, command);
-            }
-        }
-
-        private void StartMonitorThreads()
-        {
-            lock(monitor)
-            {
-                if(this.IsDisposed || this.disposing)
-                {
-                    return;
-                }
-
-                if(monitorStarted.Value)
-                {
-                    return;
-                }
-
-                if(localWireFormatInfo == null)
-                {
-                    return;
-                }
-
-                if(remoteWireFormatInfo == null)
-                {
-                    return;
-                }
-
-                readCheckTime =
-                    Math.Min(
-                        localWireFormatInfo.MaxInactivityDuration,
-                        remoteWireFormatInfo.MaxInactivityDuration);
-                initialDelayTime =
-                    Math.Min(
-                        localWireFormatInfo.MaxInactivityDurationInitialDelay,
-                        remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
-				
-				Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
-                                   instanceId, readCheckTime );
-				Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
-                                   instanceId, initialDelayTime );
-
-                this.asyncTasks = new CompositeTaskRunner("InactivityMonitor[" + instanceId
+ "].Runner");
-
-                this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
-                this.asyncWriteTask = new AsyncWriteTask(this);
-
-                this.asyncTasks.AddTask(this.asyncErrorTask);
-                this.asyncTasks.AddTask(this.asyncWriteTask);
-
-                if(readCheckTime > 0)
-                {
-                    monitorStarted.Value = true;
+		#region WriteCheck Related
+		/// <summary>
+		/// Check the write to the broker
+		/// </summary>
+		public void WriteCheck()
+		{
+			if(this.inWrite.Value || this.failed.Value)
+			{
+				Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
+				return;
+			}
+
+			if(!commandSent.Value)
+			{
+				Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending
a KeepAliveInfo.", instanceId);
+				this.asyncWriteTask.IsPending = true;
+				this.asyncTasks.Wakeup();
+			}
+			else
+			{
+				Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting
flag.", instanceId);
+			}
+
+			commandSent.Value = false;
+		}
+		#endregion
+
+		#region ReadCheck Related
+		public void ReadCheck()
+		{
+			DateTime now = DateTime.Now;
+			TimeSpan elapsed = now - this.lastReadCheckTime;
+
+			if(!AllowReadCheck(elapsed))
+			{
+				Tracer.Debug("InactivityMonitor[" + instanceId + "]: A read check is not currently allowed.");
+				return;
+			}
+
+			this.lastReadCheckTime = now;
+
+			if(this.inRead.Value || this.failed.Value)
+			{
+				Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.",
instanceId);
+				return;
+			}
+
+			if(!commandReceived.Value)
+			{
+				Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check!
Sending an InactivityException!", instanceId);
+				this.asyncErrorTask.IsPending = true;
+				this.asyncTasks.Wakeup();
+			}
+			else
+			{
+				commandReceived.Value = false;
+			}
+		}
 
-                    writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
+		/// <summary>
+		/// Checks if we should allow the read check(if less than 90% of the read
+		/// check time elapsed then we dont do the readcheck
+		/// </summary>
+		/// <param name="elapsed"></param>
+		/// <returns></returns>
+		public bool AllowReadCheck(TimeSpan elapsed)
+		{
+			return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
+		}
+		#endregion
+
+		public override void Stop()
+		{
+			StopMonitorThreads();
+			next.Stop();
+		}
+
+		protected override void OnCommand(ITransport sender, Command command)
+		{
+			commandReceived.Value = true;
+			inRead.Value = true;
+			try
+			{
+				if(command.IsKeepAliveInfo)
+				{
+					KeepAliveInfo info = command as KeepAliveInfo;
+					if(info.ResponseRequired)
+					{
+						try
+						{
+							info.ResponseRequired = false;
+							Oneway(info);
+						}
+						catch(IOException ex)
+						{
+							OnException(this, ex);
+						}
+					}
+				}
+				else if(command.IsWireFormatInfo)
+				{
+					lock(monitor)
+					{
+						remoteWireFormatInfo = command as WireFormatInfo;
+						try
+						{
+							StartMonitorThreads();
+						}
+						catch(IOException ex)
+						{
+							OnException(this, ex);
+						}
+					}
+				}
+				base.OnCommand(sender, command);
+			}
+			finally
+			{
+				inRead.Value = false;
+			}
+		}
+
+		public override void Oneway(Command command)
+		{
+			// Disable inactivity monitoring while processing a command.
+			//synchronize this method - its not synchronized
+			//further down the transport stack and gets called by more
+			//than one thread  by this class
+			lock(inWrite)
+			{
+				inWrite.Value = true;
+				try
+				{
+					if(failed.Value)
+					{
+						throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
+					}
+					if(command.IsWireFormatInfo)
+					{
+						lock(monitor)
+						{
+							localWireFormatInfo = command as WireFormatInfo;
+							StartMonitorThreads();
+						}
+					}
+					next.Oneway(command);
+				}
+				finally
+				{
+					commandSent.Value = true;
+					inWrite.Value = false;
+				}
+			}
+		}
+
+		protected override void OnException(ITransport sender, Exception command)
+		{
+			if(failed.CompareAndSet(false, true) && !this.disposing)
+			{
+				Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
+				StopMonitorThreads();
+				base.OnException(sender, command);
+			}
+		}
+
+		private void StartMonitorThreads()
+		{
+			lock(monitor)
+			{
+				if(this.IsDisposed || this.disposing)
+				{
+					return;
+				}
+
+				if(monitorStarted.Value)
+				{
+					return;
+				}
+
+				if(localWireFormatInfo == null)
+				{
+					return;
+				}
+
+				if(remoteWireFormatInfo == null)
+				{
+					return;
+				}
+
+				readCheckTime =
+					Math.Min(
+						localWireFormatInfo.MaxInactivityDuration,
+						remoteWireFormatInfo.MaxInactivityDuration);
+				initialDelayTime =
+					Math.Min(
+						localWireFormatInfo.MaxInactivityDurationInitialDelay,
+						remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+
+				if(readCheckTime > 0)
+				{
+					Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+								   instanceId, readCheckTime);
+					Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+									   instanceId, initialDelayTime);
+
+					monitorStarted.Value = true;
+					this.asyncTasks = new CompositeTaskRunner("InactivityMonitor[" + instanceId + "].Runner");
+
+					this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
+					this.asyncWriteTask = new AsyncWriteTask(this);
+
+					this.asyncTasks.AddTask(this.asyncErrorTask);
+					this.asyncTasks.AddTask(this.asyncWriteTask);
+
+					writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
 
 					Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
-                                       instanceId, writeCheckTime );
-									
-                    this.connectionCheckTimer = new Timer(
-                        new TimerCallback(CheckConnection),
-                        null,
-                        initialDelayTime,
-                        writeCheckTime
-                        );
-                }
-            }
-        }
-
-        private void StopMonitorThreads()
-        {
-            lock(monitor)
-            {
-                if(monitorStarted.CompareAndSet(true, false))
-                {
-                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);
-
-                    // Attempt to wait for the Timer to shutdown, but don't wait
-                    // forever, if they don't shutdown after two seconds, just quit.
-                    this.connectionCheckTimer.Dispose(shutdownEvent);
-                    if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000), false))
-                    {
-                        Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown
properly.", instanceId);
-                    }
-
-                    this.asyncTasks.RemoveTask(this.asyncWriteTask);
-                    this.asyncTasks.RemoveTask(this.asyncErrorTask);
-
-                    this.asyncTasks.Shutdown();
-                    this.asyncTasks = null;
-                    this.asyncWriteTask = null;
-                    this.asyncErrorTask = null;
-                    this.connectionCheckTimer = null;
-                }
-            }
-
-            Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped Monitor Threads.", instanceId);
-        }
-
-        #region Async Tasks
-        // Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
-        class AsyncSignalReadErrorkTask : CompositeTask
-        {
-            private readonly InactivityMonitor parent;
-            private readonly Uri remote;
-            private readonly Atomic<bool> pending = new Atomic<bool>(false);
-
-            public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri remote)
-            {
-                this.parent = parent;
-                this.remote = remote;
-            }
-
-            public bool IsPending
-            {
-                get { return this.pending.Value; }
-                set { this.pending.Value = value; }
-            }
-
-            public bool Iterate()
-            {
-                if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
-                {
-                    IOException ex = new IOException("Channel was inactive for too long:
" + remote);
-                    this.parent.OnException(parent, ex);
-                }
-
-                return this.pending.Value;
-            }
-        }
-
-        // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
-        class AsyncWriteTask : CompositeTask
-        {
-            private readonly InactivityMonitor parent;
-            private readonly Atomic<bool> pending = new Atomic<bool>(false);
-
-            public AsyncWriteTask(InactivityMonitor parent)
-            {
-                this.parent = parent;
-            }
-
-            public bool IsPending
-            {
-                get { return this.pending.Value; }
-                set { this.pending.Value = value; }
-            }
+									   instanceId, writeCheckTime);
+
+					this.connectionCheckTimer = new Timer(
+						new TimerCallback(CheckConnection),
+						null,
+						initialDelayTime,
+						writeCheckTime
+						);
+				}
+			}
+		}
+
+		private void StopMonitorThreads()
+		{
+			lock(monitor)
+			{
+				if(monitorStarted.CompareAndSet(true, false))
+				{
+					AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+
+					if(null != connectionCheckTimer)
+					{
+						// Attempt to wait for the Timer to shutdown, but don't wait
+						// forever, if they don't shutdown after two seconds, just quit.
+						this.connectionCheckTimer.Dispose(shutdownEvent);
+						if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000), false))
+						{
+							Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown properly.",
instanceId);
+						}
+
+						this.connectionCheckTimer = null;
+					}
+
+					if(null != this.asyncTasks)
+					{
+						this.asyncTasks.RemoveTask(this.asyncWriteTask);
+						this.asyncTasks.RemoveTask(this.asyncErrorTask);
+
+						this.asyncTasks.Shutdown();
+						this.asyncTasks = null;
+					}
+
+					this.asyncWriteTask = null;
+					this.asyncErrorTask = null;
+				}
+			}
+
+			Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped Monitor Threads.", instanceId);
+		}
+
+		#region Async Tasks
+		// Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
+		class AsyncSignalReadErrorkTask : CompositeTask
+		{
+			private readonly InactivityMonitor parent;
+			private readonly Uri remote;
+			private readonly Atomic<bool> pending = new Atomic<bool>(false);
+
+			public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri remote)
+			{
+				this.parent = parent;
+				this.remote = remote;
+			}
+
+			public bool IsPending
+			{
+				get { return this.pending.Value; }
+				set { this.pending.Value = value; }
+			}
+
+			public bool Iterate()
+			{
+				if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
+				{
+					IOException ex = new IOException("Channel was inactive for too long: " + remote);
+					this.parent.OnException(parent, ex);
+				}
+
+				return this.pending.Value;
+			}
+		}
+
+		// Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
+		class AsyncWriteTask : CompositeTask
+		{
+			private readonly InactivityMonitor parent;
+			private readonly Atomic<bool> pending = new Atomic<bool>(false);
+
+			public AsyncWriteTask(InactivityMonitor parent)
+			{
+				this.parent = parent;
+			}
+
+			public bool IsPending
+			{
+				get { return this.pending.Value; }
+				set { this.pending.Value = value; }
+			}
 
-            public bool Iterate()
-            {
+			public bool Iterate()
+			{
 				Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
-                if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
-                {
-                    try
-                    {
+				if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
+				{
+					try
+					{
 						Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
-                                           parent.instanceId);
-                        KeepAliveInfo info = new KeepAliveInfo();
-                        info.ResponseRequired = this.parent.keepAliveResponseRequired.Value;
-                        this.parent.Oneway(info);
-                    }
-                    catch(IOException e)
-                    {
-                        this.parent.OnException(parent, e);
-                    }
-                }
-
-                return this.pending.Value;
-            }
-        }
-        #endregion
-    }
+										   parent.instanceId);
+						KeepAliveInfo info = new KeepAliveInfo();
+						info.ResponseRequired = this.parent.keepAliveResponseRequired.Value;
+						this.parent.Oneway(info);
+					}
+					catch(IOException e)
+					{
+						this.parent.OnException(parent, e);
+					}
+				}
+
+				return this.pending.Value;
+			}
+		}
+		#endregion
+	}
 
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/NetTxConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/NetTxConnectionFactoryTest.cs?rev=1171874&r1=1171873&r2=1171874&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/NetTxConnectionFactoryTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/NetTxConnectionFactoryTest.cs
Sat Sep 17 00:23:26 2011
@@ -222,8 +222,8 @@ namespace Apache.NMS.ActiveMQ.Test
         [Test]
         [TestCase("/var/log/nms/recovery/", true)]
         [TestCase("/var/temp/log/nms/recovery/", false)]
-        [TestCase("C:\\Transactions\\ReceoveryLogs", true)]
-        [TestCase("\\\\ServerName\\Transactions\\ReceoveryLogs", true)]
+        [TestCase("C:\\Transactions\\RecoveryLogs", true)]
+        [TestCase("\\\\ServerName\\Transactions\\RecoveryLogs", true)]
         public void TestConfigureRecoveryPolicyLogger(string location, bool autoCreate)
         {
             string testuri = string.Format("activemq:tcp://${{activemqhost}}:61616" +
@@ -286,8 +286,7 @@ namespace Apache.NMS.ActiveMQ.Test
             [Values("tcp://${activemqhost}:61616?nms.RecoveryPolicy.RecoveryLoggerType=invalid")]
             string baseConnectionURI)
         {
-            INetTxConnectionFactory factory =
-                new NetTxConnectionFactory(NMSTestSupport.ReplaceEnvVar(baseConnectionURI));
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(NMSTestSupport.ReplaceEnvVar(baseConnectionURI));
 
             using(IConnection connection = factory.CreateConnection()){}
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs?rev=1171874&r1=1171873&r2=1171874&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
Sat Sep 17 00:23:26 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.Util;
@@ -29,17 +30,16 @@ namespace Apache.NMS.ActiveMQ.Test
 		protected static string DESTINATION_NAME = "TestMaxInactivityDuration";
 		protected static string CORRELATION_ID = "MaxInactivityCorrelationID";
 
-		/// <summary>
-		/// The name of the connection configuration that CreateNMSFactory() will load.
-		/// Refer to the nmsprovider-test.config file for the value of this variable.
-		/// </summary>
-		/// <returns></returns>
-		protected override string GetNameTestURI() { return "maxInactivityDurationURI"; }
-
 		[Test]
 		public void TestMaxInactivityDuration()
 		{
-			using(IConnection connection = CreateConnection())
+			string testuri = "activemq:tcp://${activemqhost}:61616" +
+										"?wireFormat.maxInactivityDurationInitialDelay=5000" +
+										"&wireFormat.maxInactivityDuration=10000" +
+										"&connection.asyncClose=false";
+
+			NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
+			using(IConnection connection = factory.CreateConnection("", ""))
 			{
 				connection.Start();
 				using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
@@ -72,5 +72,49 @@ namespace Apache.NMS.ActiveMQ.Test
 			request.NMSType = "Test";
 			producer.Send(request);
 		}
+
+		[Test, Sequential]
+		public void TestInactivityMonitorThreadLeak(
+			[Values(0, 1000)]
+			int inactivityDuration)
+		{
+			Process currentProcess = Process.GetCurrentProcess();
+			Tracer.InfoFormat("Beginning thread count: {0}, handle count: {1}", currentProcess.Threads.Count,
currentProcess.HandleCount);
+
+			string testuri = string.Format("activemq:tcp://${{activemqhost}}:61616?wireFormat.maxInactivityDuration={0}",
inactivityDuration);
+	
+			NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
+
+			// We measure the initial resource counts, and then allow a certain fudge factor for the
resources
+			// to fluctuate at run-time.  We allow for a certain amount of fluctuation, but if the
counts
+			// grow outside the safe boundaries of delayed garbage collection, then we fail the test.
+			currentProcess = Process.GetCurrentProcess();
+			int beginThreadCount = currentProcess.Threads.Count;
+			int beginHandleCount = currentProcess.HandleCount;
+			int maxThreadGrowth = 10;
+			int maxHandleGrowth = 500;
+
+			for(int i = 0; i < 200; i++)
+			{
+				using(IConnection connection = factory.CreateConnection("ResourceLeakTest", "Password"))
+				{
+					using(ISession session = connection.CreateSession())
+					{
+						IDestination destination = SessionUtil.GetDestination(session, "topic://NMSResourceLeak.TestTopic");
+						using(IMessageConsumer consumer = session.CreateConsumer(destination))
+						{
+							connection.Start();
+						}
+					}
+				}
+
+				currentProcess = Process.GetCurrentProcess();
+				int endThreadCount = currentProcess.Threads.Count;
+				int endHandleCount = currentProcess.HandleCount;
+
+				Assert.Less(endThreadCount, beginThreadCount + maxThreadGrowth, string.Format("Thread
count grew beyond maximum of {0} on iteration #{1}.", maxThreadGrowth, i));
+				Assert.Less(endHandleCount, beginHandleCount + maxHandleGrowth, string.Format("Handle
count grew beyond maximum of {0} on iteration #{1}.", maxHandleGrowth, i));
+			}
+		}
 	}
 }



Mime
View raw message