activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r887420 [3/3] - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: ./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Threads/
Date Fri, 04 Dec 2009 22:53:42 GMT
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
Fri Dec  4 22:53:41 2009
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// A TaskRunner that dedicates a single thread to running a single Task.
+    /// </summary>
+    public class DedicatedTaskRunner : TaskRunner
+    {
+        private readonly Mutex mutex = new Mutex();
+        private Thread theThread = null;
+        private Task task = null;
+
+        private bool terminated = false;
+        private bool pending = false;
+        private bool shutdown = false;
+
+        public DedicatedTaskRunner(Task task)
+        {
+            if(task == null)
+            {
+                throw new NullReferenceException("Task was null");
+            }
+
+            this.task = task;
+
+            this.theThread = new Thread(Run);
+            this.theThread.IsBackground = true;
+            this.theThread.Start();
+        }
+
+        public void Shutdown(TimeSpan timeout)
+        {
+            lock(mutex)
+            {
+                this.shutdown = true;
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(Thread.CurrentThread != this.theThread && !this.terminated)
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+            }
+        }
+
+        public void Shutdown()
+        {
+            this.Shutdown(TimeSpan.FromMilliseconds(-1));
+        }
+
+        public void Wakeup()
+        {
+            lock(mutex)
+            {
+                if(this.shutdown)
+                {
+                    return;
+                }
+
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        internal void Run()
+        {
+            try
+            {
+                while(true)
+                {
+                    lock(this.mutex)
+                    {
+                        pending = false;
+
+                        if(this.shutdown)
+                        {
+
+                            return;
+                        }
+                    }
+
+                    if(!this.task.Iterate())
+                    {
+                        // wait to be notified.
+                        lock(this.mutex)
+                        {
+                            if(this.shutdown)
+                            {
+                                return;
+                            }
+
+                            while(!this.pending)
+                            {
+                                Monitor.Wait(this.mutex);
+                            }
+                        }
+                    }
+                }
+            }
+            catch
+            {
+            }
+            finally
+            {
+                // Make sure we notify any waiting threads that thread
+                // has terminated.
+                lock(this.mutex)
+                {
+                    this.terminated = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/Task.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/Task.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/Task.cs Fri Dec
 4 22:53:41 2009
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// Represents a task that may take a few iterations to complete.
+    /// </summary>
+    public interface Task
+    {
+        bool Iterate();
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/Task.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunner.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunner.cs
Fri Dec  4 22:53:41 2009
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// Allows you to request a thread execute the associated Task.
+    /// </summary>
+    public interface TaskRunner
+    {
+        void Wakeup();
+        void Shutdown();
+        void Shutdown(TimeSpan timeout);
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
Fri Dec  4 22:53:41 2009
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// Manages the thread pool for long running tasks. Long running tasks are not
+    /// always active but when they are active, they may need a few iterations of
+    /// processing for them to become idle. The manager ensures that each task is
+    /// processes but that no one task overtakes the system. This is kina like
+    /// cooperative multitasking.
+     /// </summary>
+    public class TaskRunnerFactory
+    {
+        protected int maxIterationsPerRun;
+        protected String name;
+        protected ThreadPriority priority;
+        protected bool daemon;
+
+        public TaskRunnerFactory()
+        {
+            InitTaskRunnerFactory("ActiveMQ Task", ThreadPriority.Normal, true, 1000, false);
+        }
+
+        public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun)
+        {
+            InitTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, false);
+        }
+
+        public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun,
bool dedicatedTaskRunner)
+        {
+            InitTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner);
+        }
+
+        public void InitTaskRunnerFactory(String name, ThreadPriority priority, bool daemon,
int maxIterationsPerRun, bool dedicatedTaskRunner)
+        {
+            this.name = name;
+            this.priority = priority;
+            this.daemon = daemon;
+            this.maxIterationsPerRun = maxIterationsPerRun;
+
+            // If your OS/JVM combination has a good thread model, you may want to avoid
+            // using a thread pool to run tasks and use a DedicatedTaskRunner instead.
+        }
+
+        public void Shutdown()
+        {
+        }
+
+        public TaskRunner CreateTaskRunner(Task task, String name)
+        {
+            return new PooledTaskRunner(task, maxIterationsPerRun);
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
Fri Dec  4 22:53:41 2009
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS.Stomp;
+using Apache.NMS.Stomp.Commands;
+using System.Collections;
+
+namespace Apache.NMS.Stomp
+{
+    public enum TransactionType
+    {
+        Begin = 0, Commit = 1, Rollback = 2
+    }
+}
+
+namespace Apache.NMS.Stomp
+{
+    public class TransactionContext
+    {
+        private TransactionId transactionId;
+        private Session session;
+        private ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
+
+        public TransactionContext(Session session)
+        {
+            this.session = session;
+        }
+
+        public bool InTransaction
+        {
+            get{ return this.transactionId != null; }
+        }
+
+        public TransactionId TransactionId
+        {
+            get { return transactionId; }
+        }
+
+        /// <summary>
+        /// Method AddSynchronization
+        /// </summary>
+        public void AddSynchronization(ISynchronization synchronization)
+        {
+            synchronizations.Add(synchronization);
+        }
+
+        public void RemoveSynchronization(ISynchronization synchronization)
+        {
+            synchronizations.Remove(synchronization);
+        }
+
+        public void Begin()
+        {
+            if(!InTransaction)
+            {
+                this.transactionId = this.session.Connection.CreateLocalTransactionId();
+
+                TransactionInfo info = new TransactionInfo();
+                info.ConnectionId = this.session.Connection.ConnectionId;
+                info.TransactionId = transactionId;
+                info.Type = (int) TransactionType.Begin;
+
+                this.session.Connection.Oneway(info);
+            }
+        }
+
+        public void Rollback()
+        {
+            if(!InTransaction)
+            {
+                throw new NMSException("Invliad State: Not Currently in a Transaction");
+            }
+
+            this.BeforeEnd();
+
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = transactionId;
+            info.Type = (int) TransactionType.Rollback;
+
+            this.transactionId = null;
+            this.session.Connection.SyncRequest(info);
+
+            this.AfterRollback();
+            this.synchronizations.Clear();
+        }
+
+        public void Commit()
+        {
+            if(!InTransaction)
+            {
+                throw new NMSException("Invliad State: Not Currently in a Transaction");
+            }
+
+            this.BeforeEnd();
+
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = transactionId;
+            info.Type = (int) TransactionType.CommitOnePhase;
+
+            this.transactionId = null;
+            this.session.Connection.SyncRequest(info);
+
+            this.AfterCommit();
+            this.synchronizations.Clear();
+        }
+
+        internal void BeforeEnd()
+        {
+            lock(this.synchronizations.SyncRoot)
+            {
+                foreach(ISynchronization synchronization in this.synchronizations)
+                {
+                    synchronization.BeforeEnd();
+                }
+            }
+        }
+
+        internal void AfterCommit()
+        {
+            lock(this.synchronizations.SyncRoot)
+            {
+                foreach(ISynchronization synchronization in this.synchronizations)
+                {
+                    synchronization.AfterCommit();
+                }
+            }
+        }
+
+        internal void AfterRollback()
+        {
+            lock(this.synchronizations.SyncRoot)
+            {
+                foreach(ISynchronization synchronization in this.synchronizations)
+                {
+                    synchronization.AfterRollback();
+                }
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj?rev=887420&r1=887419&r2=887420&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj Fri Dec  4 22:53:41
2009
@@ -93,6 +93,19 @@
     <Compile Include="src\main\csharp\ConnectionMetaData.cs" />
     <Compile Include="src\main\csharp\Commands\MessageDispatch.cs" />
     <Compile Include="src\main\csharp\Commands\ShutdownInfo.cs" />
+    <Compile Include="src\main\csharp\IDispatcher.cs" />
+    <Compile Include="src\main\csharp\ISynchronization.cs" />
+    <Compile Include="src\main\csharp\MessageConsumer.cs" />
+    <Compile Include="src\main\csharp\MessageProducer.cs" />
+    <Compile Include="src\main\csharp\PrefetchPolicy.cs" />
+    <Compile Include="src\main\csharp\Session.cs" />
+    <Compile Include="src\main\csharp\SessionExecutor.cs" />
+    <Compile Include="src\main\csharp\TransactionContext.cs" />
+    <Compile Include="src\main\csharp\Threads\DedicatedTaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\Task.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" />
+    <Compile Include="src\main\csharp\Commands\DataStructureTypes.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="keyfile\NMSKey.snk" />



Mime
View raw message