activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r893560 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Threads/ main/csharp/Transport/ main/csharp/Transport/Mock/ test/csharp/Threads/ test/csharp/Transport/Inactivity/
Date Wed, 23 Dec 2009 16:06:03 GMT
Author: tabish
Date: Wed Dec 23 16:06:02 2009
New Revision: 893560

URL: http://svn.apache.org/viewvc?rev=893560&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-212

Update to inactivity monitor and addition of Composite Task Runner to create a mechanism for
controlling execution of asynchronous tasks.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTask.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/CompositeTaskRunnerTest.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTask.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTask.cs?rev=893560&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTask.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTask.cs
Wed Dec 23 16:06:02 2009
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+	/// <summary>
+	/// A Composite task is one of N tasks that can be managed by a 
+	/// CompositTaskRunner instance.  The CompositeTaskRunner checks each
+	/// task when its wakeup method is called to determine if the Task has
+	/// any work it needs to complete, if no tasks have any pending work 
+	/// then the CompositeTaskRunner can return to its sleep state until 
+	/// the next time its wakeup method is called or it is shut down.
+	/// </summary>
+	public interface CompositeTask : Task
+	{
+		/// <summary>
+		/// Indicates if this Task has any pending work.
+		/// </summary>
+		bool IsPending{ get; }		
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTask.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=893560&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
Wed Dec 23 16:06:02 2009
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+    /// <summary>
+    /// A TaskRunner that dedicates a single thread to running a single Task.
+    /// </summary>
+    public class CompositeTaskRunner : TaskRunner
+    {
+        private readonly Mutex mutex = new Mutex();
+        private Thread theThread = null;
+        private LinkedList<CompositeTask> tasks = new LinkedList<CompositeTask>();
+
+        private bool terminated = false;
+        private bool pending = false;
+        private bool shutdown = false;
+        
+        public CompositeTaskRunner()
+        {
+            this.theThread = new Thread(Run);
+            this.theThread.IsBackground = true;
+            this.theThread.Start();
+        }
+		
+		public void AddTask(CompositeTask task)
+		{
+			lock(mutex)
+			{
+				this.tasks.AddLast(task);
+				this.Wakeup();
+			}
+		}
+
+		public void RemoveTask(CompositeTask task)
+		{
+			lock(mutex)
+			{
+				this.tasks.Remove(task);
+				this.Wakeup();
+			}
+		}
+
+        public void Shutdown(TimeSpan timeout)
+        {
+            lock(mutex) 
+            {
+                this.shutdown = true;
+                this.pending = true;
+                
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(Thread.CurrentThread != this.theThread && !this.terminated) 
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+            }
+        }
+
+        public void Shutdown()
+        {
+            this.Shutdown(TimeSpan.FromMilliseconds(-1));
+        }
+
+        public void Wakeup()
+        {
+            lock(mutex)
+            {
+                if(this.shutdown)
+                {
+                    return;
+                }
+                
+                this.pending = true;
+                
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+
+        internal void Run()
+        {
+            try 
+            {
+                while(true) 
+                {
+                    lock(this.mutex) 
+                    {
+                        pending = false;
+                        
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+                    }
+
+                    if(!this.Iterate())
+                    {
+                        // wait to be notified.
+                        lock(this.mutex)
+                        {
+                            if(this.shutdown) 
+                            {
+                                return;
+                            }
+                            
+                            while(!this.pending) 
+                            {
+                                Monitor.Wait(this.mutex);
+                            }
+                        }
+                    }
+                }
+            }
+            catch
+            {
+            }
+            finally
+            {        
+                // Make sure we notify any waiting threads that thread
+                // has terminated.
+                lock(this.mutex)
+                {
+                    this.terminated = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+		
+		private bool Iterate()
+		{
+			lock(mutex)
+			{
+				foreach(CompositeTask task in this.tasks)
+				{
+					if(task.IsPending)
+					{
+						task.Iterate();
+
+						// Always return true here so that we can check the next
+						// task in the list to see if its done.
+						return true;
+					}
+				}
+			}
+			
+			return false;
+		}
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs?rev=893560&r1=893559&r2=893560&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs Wed
Dec 23 16:06:02 2009
@@ -22,6 +22,15 @@
 	/// </summary>
 	public interface Task
 	{
+		/// <summary>
+		/// Performs some portion of the work that this Task object is
+		/// assigned to complete.  When the task is entirely finished this
+		/// method should return false. 
+		/// </summary>
+		/// <returns>
+		/// A <see cref="System.Boolean"/> this indicates if this Task should 
+		/// be run again.
+		/// </returns>
 		bool Iterate();
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=893560&r1=893559&r2=893560&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Wed Dec 23 16:06:02 2009
@@ -17,6 +17,7 @@
 
 using System;
 using System.Threading;
+using Apache.NMS.ActiveMQ.Threads;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.Util;
 
@@ -38,13 +39,19 @@
         private Atomic<bool> inRead = new Atomic<bool>(false);
         private Atomic<bool> inWrite = new Atomic<bool>(false);
 
+		private CompositeTaskRunner asyncTasks;
+        private AsyncSignalReadErrorkTask asyncErrorTask;
+        private AsyncWriteTask asyncWriteTask;
+		
         private Mutex monitor = new Mutex();
 
         private Timer readCheckTimer;
         private Timer writeCheckTimer;
 
-        private WriteChecker writeChecker;
-        private ReadChecker readChecker;
+        private DateTime lastReadCheckTime;
+
+        //private WriteChecker writeChecker;
+        //private ReadChecker readChecker;
 
         private long readCheckTime;
         public long ReadCheckTime
@@ -86,63 +93,68 @@
             : base(next)
         {
             Tracer.Debug("Creating Inactivity Monitor");
+            Console.WriteLine("Creating Inactivity Monitor");
         }
 
         #region WriteCheck Related
         /// <summary>
         /// Check the write to the broker
         /// </summary>
-        public void WriteCheck()
+        public void WriteCheck(object state)
         {
-            if(inWrite.Value)
+            Console.WriteLine("Entered WriteCheck");
+
+            if(this.inWrite.Value || this.failed.Value)
             {
+                Console.WriteLine("In write or already failed.");
                 return;
             }
 
             if(!commandSent.Value)
             {
                 Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
-                ThreadPool.QueueUserWorkItem(new WaitCallback(SendKeepAlive));
+                Console.WriteLine("No Message sent since last write check. Sending a KeepAliveInfo");
+                this.asyncWriteTask.IsPending = true;
+                this.asyncTasks.Wakeup();
             }
             else
             {
                 Tracer.Debug("Message sent since last write check. Resetting flag");
+                Console.WriteLine("Message sent since last write check. Resetting flag");
             }
 
             commandSent.Value = false;
         }
+        #endregion
 
-        private void SendKeepAlive(object state)
+        #region ReadCheck Related
+        public void ReadCheck(object state)
         {
-            if(monitorStarted.Value)
+            Console.WriteLine("Entered ReadCheck");
+            DateTime now = DateTime.Now;
+            TimeSpan elapsed = now - this.lastReadCheckTime;
+
+            if(!AllowReadCheck(elapsed))
             {
-                try
-                {
-                    KeepAliveInfo info = new KeepAliveInfo();
-                    info.ResponseRequired = keepAliveResponseRequired.Value;
-                    Oneway(info);
-                }
-                catch(IOException exception)
-                {
-                    OnException(this, exception);
-                }
+                Console.WriteLine("A read check is not yet allowed.");
+                return;
             }
-        }
-        #endregion
 
-        #region ReadCheck Related
-        public void ReadCheck()
-        {
-            if(inRead.Value)
+            this.lastReadCheckTime = now;
+
+            if(this.inRead.Value || this.failed.Value)
             {
-                Tracer.Debug("A receive is in progress");
+                Tracer.Debug("A receive is in progress or already failed.");
+                Console.WriteLine("A receive is in progress");
                 return;
             }
 
             if(!commandReceived.Value)
             {
                 Tracer.Debug("No message received since last read check! Sending an InactivityException!");
-                ThreadPool.QueueUserWorkItem(new WaitCallback(SendInactivityException));
+                Console.WriteLine("No message received since last read check! Sending an
InactivityException!");
+                this.asyncErrorTask.IsPending = true;
+                this.asyncTasks.Wakeup();
             }
             else
             {
@@ -150,20 +162,15 @@
             }
         }
 
-        private void SendInactivityException(object state)
-        {
-            OnException(this, new IOException("Channel was inactive for too long."));
-        }
-
         /// <summary>
         /// Checks if we should allow the read check(if less than 90% of the read
         /// check time elapsed then we dont do the readcheck
         /// </summary>
         /// <param name="elapsed"></param>
         /// <returns></returns>
-        public bool AllowReadCheck(long elapsed)
+        public bool AllowReadCheck(TimeSpan elapsed)
         {
-            return (elapsed > (readCheckTime * 9 / 10));
+            return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
         }
         #endregion
 
@@ -256,6 +263,7 @@
             if(failed.CompareAndSet(false, true))
             {
                 Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
+                Console.WriteLine("Exception received in the Inactivity Monitor: " + command.Message);
                 StopMonitorThreads();
                 base.OnException(sender, command);
             }
@@ -269,10 +277,12 @@
                 {
                     return;
                 }
+
                 if(localWireFormatInfo == null)
                 {
                     return;
                 }
+
                 if(remoteWireFormatInfo == null)
                 {
                     return;
@@ -287,22 +297,28 @@
                         localWireFormatInfo.MaxInactivityDurationInitialDelay,
                         remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
 
+                this.asyncTasks = new CompositeTaskRunner();
+
+                this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
+                this.asyncWriteTask = new AsyncWriteTask(this);
+
+                this.asyncTasks.AddTask(this.asyncErrorTask);
+                this.asyncTasks.AddTask(this.asyncWriteTask);
+
                 if(readCheckTime > 0)
                 {
                     monitorStarted.Value = true;
-                    writeChecker = new WriteChecker(this);
-                    readChecker = new ReadChecker(this);
 
                     writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
 
                     writeCheckTimer = new Timer(
-                        new TimerCallback(writeChecker.Check),
+                        new TimerCallback(WriteCheck),
                         null,
                         initialDelayTime,
                         writeCheckTime
                         );
                     readCheckTimer = new Timer(
-                        new TimerCallback(readChecker.Check),
+                        new TimerCallback(ReadCheck),
                         null,
                         initialDelayTime,
                         readCheckTime
@@ -317,59 +333,92 @@
             {
                 if(monitorStarted.CompareAndSet(true, false))
                 {
-                    readCheckTimer.Dispose();
-                    writeCheckTimer.Dispose();
+                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+
+                    this.readCheckTimer.Dispose(shutdownEvent);
+                    shutdownEvent.WaitOne();
+                    this.writeCheckTimer.Dispose(shutdownEvent);
+                    shutdownEvent.WaitOne();
+
+					this.asyncTasks.Shutdown();
+                    this.asyncTasks = null;
+                    this.asyncWriteTask = null;
+                    this.asyncErrorTask = null;
                 }
             }
         }
-    }
-
-    class WriteChecker
-    {
-        private readonly InactivityMonitor parent;
 
-        public WriteChecker(InactivityMonitor parent)
+        #region Async Tasks
+        // Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
+        class AsyncSignalReadErrorkTask : CompositeTask
         {
-            if(parent == null)
+            private InactivityMonitor parent;
+            private Uri remote;
+            private Atomic<bool> pending = new Atomic<bool>(false);
+    
+            public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri remote)
             {
-                throw new NullReferenceException("WriteChecker created with a NULL parent.");
+                this.parent = parent;
+                this.remote = remote;
             }
 
-            this.parent = parent;
-        }
-
-        public void Check(object state)
-        {
-            this.parent.WriteCheck();
-        }
-    }
-
-    class ReadChecker
-    {
-        private readonly InactivityMonitor parent;
-        private long lastRunTime;
-
-        public ReadChecker(InactivityMonitor parent)
-        {
-            if(parent == null)
+            public bool IsPending
             {
-                throw new NullReferenceException("ReadChecker created with a null parent");
+                get { return this.pending.Value; }
+                set { this.pending.Value = value; }
+            }
+    
+            public bool Iterate()
+            {
+                if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
+                {
+                    Console.WriteLine("AsyncSignalReadErrorkTask - Sending Pending Read Error");
+                    IOException ex = new IOException("Channel was inactive for too long:
" + remote);
+                    this.parent.OnException(parent, ex);
+                }
+    
+                return this.pending.Value;
             }
-            this.parent = parent;
         }
-
-        public void Check(object state)
+    
+        // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
+        class AsyncWriteTask : CompositeTask
         {
-            long now = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
-            long elapsed = now - lastRunTime;
-            if(!parent.AllowReadCheck(elapsed))
+            private InactivityMonitor parent;
+            private Atomic<bool> pending = new Atomic<bool>(false);
+    
+            public AsyncWriteTask(InactivityMonitor parent)
             {
-                return;
+                this.parent = parent;
+            }
+    
+            public bool IsPending
+            {
+                get { return this.pending.Value; }
+                set { this.pending.Value = value; }
+            }
+    
+            public bool Iterate()
+            {
+                if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
+                {
+                    try
+                    {
+                        Console.WriteLine("AsyncWriteTask - Sending Pending KeepAlive");
+                        KeepAliveInfo info = new KeepAliveInfo();
+                        info.ResponseRequired = this.parent.keepAliveResponseRequired.Value;
+                        this.parent.Oneway(info);
+                    }
+                    catch(IOException e)
+                    {
+                        this.parent.OnException(parent, e);
+                    }
+                }
+    
+                return this.pending.Value;
             }
-            lastRunTime = now;
-
-            // Invoke the parent check routine.
-            this.parent.ReadCheck();
         }
+        #endregion
     }
+
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=893560&r1=893559&r2=893560&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
Wed Dec 23 16:06:02 2009
@@ -379,7 +379,7 @@
 
         public Uri RemoteAddress
         {
-            get{ return null; }
+            get{ return new Uri("mock://mock"); }
         }
         
         #endregion

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/CompositeTaskRunnerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/CompositeTaskRunnerTest.cs?rev=893560&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/CompositeTaskRunnerTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/CompositeTaskRunnerTest.cs
Wed Dec 23 16:06:02 2009
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Threads;
+
+namespace Apache.NMS.ActiveMQ.Test.Threads
+{
+    [TestFixture]
+    public class CompositeTaskRunnerTest
+    {
+
+        class CountingTask : CompositeTask
+        {
+            private int count;
+            private int goal;
+            private string name;
+
+            public CountingTask( string name, int goal )
+            {
+                this.name = name;
+                this.goal = goal;
+            }
+
+            public string Name
+            {
+                get { return name; }
+            }
+
+            public int Count
+            {
+                get { return count; }
+            }
+        
+            public bool IsPending
+            {
+                get { return count != goal; }
+            }
+
+            public bool Iterate()
+            {
+                Console.WriteLine(name + ": Running iteration " + count );
+                return !( ++count == goal );
+            }
+        }
+
+        [Test]
+        public void TestCompositeTaskRunner()
+        {
+        
+            int attempts = 0;
+        
+            CompositeTaskRunner runner = new CompositeTaskRunner();
+
+            CountingTask task1 = new CountingTask("task1", 100);
+            CountingTask task2 = new CountingTask("task2", 200);
+
+            runner.AddTask( task1 );
+            runner.AddTask( task2 );
+
+            runner.Wakeup();
+        
+            while( attempts++ != 10 )
+            {
+                Thread.Sleep( 1000 );
+
+                if(task1.Count == 100 && task2.Count == 200)
+                {
+                    break;
+                }
+            }
+        
+            Assert.IsTrue(task1.Count == 100);
+            Assert.IsTrue(task2.Count == 200);
+
+            runner.RemoveTask(task1);
+            runner.RemoveTask(task2);
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/CompositeTaskRunnerTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs?rev=893560&r1=893559&r2=893560&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
Wed Dec 23 16:06:02 2009
@@ -21,6 +21,7 @@
 using System.Collections;
 using System.Collections.Generic;
 using Apache.NMS;
+using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Transport.Mock;
 using Apache.NMS.ActiveMQ.Commands;
@@ -30,7 +31,7 @@
 
 namespace Apache.NMS.ActiveMQ.Test
 {
-    //[TestFixture]
+    [TestFixture]
     public class InactivityMonitorTest
     {
         private List<Command> received;
@@ -50,7 +51,7 @@
             received.Add( command );
         }
 
-        //[SetUp]
+        [SetUp]
         public void SetUp()
         {
             this.received = new List<Command>();
@@ -68,7 +69,7 @@
             this.localWireFormatInfo.TightEncodingEnabled = false;
         }
 
-        //[Test]
+        [Test]
         public void TestCreate()
         {
             InactivityMonitor monitor = new InactivityMonitor( this.transport );
@@ -80,12 +81,13 @@
             Assert.IsTrue( monitor.IsDisposed == false );
         }
 
-        //[Test]
+        [Test]
         public void TestReadTimeout()
         {
             InactivityMonitor monitor = new InactivityMonitor( this.transport );
 
             monitor.Exception += new ExceptionHandler(OnException);
+            monitor.Command += new CommandHandler(OnCommand);
 
             // Send the local one for the monitor to record.
             monitor.Oneway( this.localWireFormatInfo );
@@ -101,7 +103,7 @@
             Assert.IsTrue( this.exceptions.Count > 0 );
         }
 
-        //[Test]
+        [Test]
         public void TestWriteMessageFail()
         {
             this.transport.FailOnKeepAliveInfoSends = true ;
@@ -110,6 +112,8 @@
             InactivityMonitor monitor = new InactivityMonitor( this.transport );
 
             monitor.Exception += new ExceptionHandler(OnException);
+            monitor.Command += new CommandHandler(OnCommand);
+            monitor.Start();
 
             // Send the local one for the monitor to record.
             monitor.Oneway( this.localWireFormatInfo );
@@ -119,21 +123,29 @@
             ActiveMQMessage message = new ActiveMQMessage();
             this.transport.InjectCommand( message );
 
+            Thread.Sleep( 2000 );
+
             // Should not have timed out on Read yet.
             Assert.IsTrue( this.exceptions.Count == 0 );
 
-            Thread.Sleep( 8000 );
+            for(int ix = 0; ix < 4; ix++)
+            {
+                this.transport.InjectCommand( message );
+                Thread.Sleep( 2000 );
+            }
 
             // Channel should have been inactive for to long.
             Assert.IsTrue( this.exceptions.Count > 0 );
         }
 
-        //[Test]
+        [Test]
         public void TestNonFailureSendCase()
         {
             InactivityMonitor monitor = new InactivityMonitor( this.transport );
 
             monitor.Exception += new ExceptionHandler(OnException);
+            monitor.Command += new CommandHandler(OnCommand);
+            monitor.Start();
 
             // Send the local one for the monitor to record.
             monitor.Oneway( this.localWireFormatInfo );



Mime
View raw message