activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1031823 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads: CompositeTask.cs CompositeTaskRunner.cs
Date Fri, 05 Nov 2010 21:42:44 GMT
Author: tabish
Date: Fri Nov  5 21:42:44 2010
New Revision: 1031823

URL: http://svn.apache.org/viewvc?rev=1031823&view=rev
Log:
Port the CompositeTaskRunner for use with Stomp 1.1 and InactivityMonitor updates to do read
and write checks.

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
  (with props)

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs?rev=1031823&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
Fri Nov  5 21:42:44 2010
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.Stomp.Threads
+{
+	/// <summary>
+	/// A Composite task is one of N tasks that can be managed by a 
+	/// CompositTaskRunner instance.  The CompositeTaskRunner checks each
+	/// task when its wakeup method is called to determine if the Task has
+	/// any work it needs to complete, if no tasks have any pending work 
+	/// then the CompositeTaskRunner can return to its sleep state until 
+	/// the next time its wakeup method is called or it is shut down.
+	/// </summary>
+	public interface CompositeTask : Task
+	{
+		/// <summary>
+		/// Indicates if this Task has any pending work.
+		/// </summary>
+		bool IsPending{ get; }		
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1031823&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
Fri Nov  5 21:42:44 2010
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// A TaskRunner that dedicates a single thread to running a single Task.
+    /// </summary>
+    public class CompositeTaskRunner : TaskRunner
+    {
+        private readonly Mutex mutex = new Mutex();
+        private readonly AutoResetEvent waiter = new AutoResetEvent(false);
+        private readonly ManualResetEvent isShutdown = new ManualResetEvent(true);
+
+        private readonly Thread theThread = null;
+        private readonly LinkedList<CompositeTask> tasks = new LinkedList<CompositeTask>();
+
+        private bool terminated = false;
+        private bool pending = false;
+        private bool shutdown = false;
+        
+        public CompositeTaskRunner()
+        {
+            this.theThread = new Thread(Run) {IsBackground = true};
+            this.theThread.Start();
+        }
+		
+		public void AddTask(CompositeTask task)
+		{
+			lock(mutex)
+			{
+				this.tasks.AddLast(task);
+				this.Wakeup();
+			}
+		}
+
+		public void RemoveTask(CompositeTask task)
+		{
+			lock(mutex)
+			{
+				this.tasks.Remove(task);
+				this.Wakeup();
+			}
+		}
+
+        public void Shutdown()
+        {
+            Monitor.Enter(this.mutex);
+
+            this.shutdown = true;
+            this.pending = true;
+
+            this.waiter.Set();
+
+            // Wait till the thread stops ( no need to wait if shutdown
+            // is called from thread that is shutting down)
+            if(Thread.CurrentThread != this.theThread && !this.terminated)
+            {
+                Monitor.Exit(this.mutex);
+                this.isShutdown.WaitOne();
+            }
+            else
+            {
+                Monitor.Exit(this.mutex);
+            }
+        }
+
+        public void Shutdown(TimeSpan timeout)
+        {
+            Monitor.Enter(this.mutex);
+
+            this.shutdown = true;
+            this.pending = true;
+
+            this.waiter.Set();
+
+            // Wait till the thread stops ( no need to wait if shutdown
+            // is called from thread that is shutting down)
+            if(Thread.CurrentThread != this.theThread && !this.terminated)
+            {
+                Monitor.Exit(this.mutex);
+                this.isShutdown.WaitOne((int)timeout.Milliseconds, false);
+            }
+            else
+            {
+                Monitor.Exit(this.mutex);
+            }
+        }
+
+        public void Wakeup()
+        {
+            lock(mutex)
+            {
+                if(this.shutdown)
+                {
+                    return;
+                }
+                
+                this.pending = true;
+                
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+
+        internal void Run()
+        {
+            lock(this.mutex)
+            {
+                this.isShutdown.Reset();
+            }
+
+            try 
+            {
+                while(true) 
+                {
+                    lock(this.mutex) 
+                    {
+                        pending = false;
+                        
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+                    }
+
+                    if(!this.Iterate())
+                    {
+                        // wait to be notified.
+                        Monitor.Enter(this.mutex);
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+
+                        while(!this.pending)
+                        {
+                            Monitor.Exit(this.mutex);
+                            this.waiter.WaitOne();
+                            Monitor.Enter(this.mutex);
+                        }
+                        Monitor.Exit(this.mutex);
+                    }
+                }
+            }
+            catch
+            {
+            }
+            finally
+            {        
+                // Make sure we notify any waiting threads that thread
+                // has terminated.
+                Monitor.Enter(this.mutex);
+                this.terminated = true;
+                Monitor.Exit(this.mutex);
+                this.isShutdown.Set();
+            }
+        }
+		
+		private bool Iterate()
+		{
+			lock(mutex)
+			{
+				foreach(CompositeTask task in this.tasks)
+				{
+					if(task.IsPending)
+					{
+						task.Iterate();
+
+						// Always return true here so that we can check the next
+						// task in the list to see if its done.
+						return true;
+					}
+				}
+			}
+			
+			return false;
+		}
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message