From commits-return-14919-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Nov 05 21:43:28 2010 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 38532 invoked from network); 5 Nov 2010 21:43:27 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Nov 2010 21:43:27 -0000 Received: (qmail 5880 invoked by uid 500); 5 Nov 2010 21:43:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 5826 invoked by uid 500); 5 Nov 2010 21:43:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 5819 invoked by uid 99); 5 Nov 2010 21:43:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Nov 2010 21:43:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Nov 2010 21:43:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DA80723888CD; Fri, 5 Nov 2010 21:42:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1031823 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads: CompositeTask.cs CompositeTaskRunner.cs Date: Fri, 05 Nov 2010 21:42:44 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101105214244.DA80723888CD@eris.apache.org> Author: tabish Date: Fri Nov 5 21:42:44 2010 New Revision: 1031823 URL: http://svn.apache.org/viewvc?rev=1031823&view=rev Log: Port the CompositeTaskRunner for use with Stomp 1.1 and InactivityMonitor updates to do read and write checks. Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs (with props) activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (with props) Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs?rev=1031823&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs Fri Nov 5 21:42:44 2010 @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.Stomp.Threads +{ + /// + /// A Composite task is one of N tasks that can be managed by a + /// CompositTaskRunner instance. The CompositeTaskRunner checks each + /// task when its wakeup method is called to determine if the Task has + /// any work it needs to complete, if no tasks have any pending work + /// then the CompositeTaskRunner can return to its sleep state until + /// the next time its wakeup method is called or it is shut down. + /// + public interface CompositeTask : Task + { + /// + /// Indicates if this Task has any pending work. + /// + bool IsPending{ get; } + } +} Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1031823&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs Fri Nov 5 21:42:44 2010 @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Apache.NMS.Stomp.Threads +{ + /// + /// A TaskRunner that dedicates a single thread to running a single Task. + /// + public class CompositeTaskRunner : TaskRunner + { + private readonly Mutex mutex = new Mutex(); + private readonly AutoResetEvent waiter = new AutoResetEvent(false); + private readonly ManualResetEvent isShutdown = new ManualResetEvent(true); + + private readonly Thread theThread = null; + private readonly LinkedList tasks = new LinkedList(); + + private bool terminated = false; + private bool pending = false; + private bool shutdown = false; + + public CompositeTaskRunner() + { + this.theThread = new Thread(Run) {IsBackground = true}; + this.theThread.Start(); + } + + public void AddTask(CompositeTask task) + { + lock(mutex) + { + this.tasks.AddLast(task); + this.Wakeup(); + } + } + + public void RemoveTask(CompositeTask task) + { + lock(mutex) + { + this.tasks.Remove(task); + this.Wakeup(); + } + } + + public void Shutdown() + { + Monitor.Enter(this.mutex); + + this.shutdown = true; + this.pending = true; + + this.waiter.Set(); + + // Wait till the thread stops ( no need to wait if shutdown + // is called from thread that is shutting down) + if(Thread.CurrentThread != this.theThread && !this.terminated) + { + Monitor.Exit(this.mutex); + this.isShutdown.WaitOne(); + } + else + { + Monitor.Exit(this.mutex); + } + } + + public void Shutdown(TimeSpan timeout) + { + Monitor.Enter(this.mutex); + + this.shutdown = true; + this.pending = true; + + this.waiter.Set(); + + // Wait till the thread stops ( no need to wait if shutdown + // is called from thread that is shutting down) + if(Thread.CurrentThread != this.theThread && !this.terminated) + { + Monitor.Exit(this.mutex); + this.isShutdown.WaitOne((int)timeout.Milliseconds, false); + } + else + { + Monitor.Exit(this.mutex); + } + } + + public void Wakeup() + { + lock(mutex) + { + if(this.shutdown) + { + return; + } + + this.pending = true; + + Monitor.PulseAll(this.mutex); + } + } + + internal void Run() + { + lock(this.mutex) + { + this.isShutdown.Reset(); + } + + try + { + while(true) + { + lock(this.mutex) + { + pending = false; + + if(this.shutdown) + { + return; + } + } + + if(!this.Iterate()) + { + // wait to be notified. + Monitor.Enter(this.mutex); + if(this.shutdown) + { + return; + } + + while(!this.pending) + { + Monitor.Exit(this.mutex); + this.waiter.WaitOne(); + Monitor.Enter(this.mutex); + } + Monitor.Exit(this.mutex); + } + } + } + catch + { + } + finally + { + // Make sure we notify any waiting threads that thread + // has terminated. + Monitor.Enter(this.mutex); + this.terminated = true; + Monitor.Exit(this.mutex); + this.isShutdown.Set(); + } + } + + private bool Iterate() + { + lock(mutex) + { + foreach(CompositeTask task in this.tasks) + { + if(task.IsPending) + { + task.Iterate(); + + // Always return true here so that we can check the next + // task in the list to see if its done. + return true; + } + } + } + + return false; + } + } +} Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs ------------------------------------------------------------------------------ svn:executable = *