activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r826784 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Util/MessageDispatchChannel.cs test/csharp/Util/MessageDispatchChannelTest.cs
Date Mon, 19 Oct 2009 19:40:30 GMT
Author: tabish
Date: Mon Oct 19 19:40:29 2009
New Revision: 826784

URL: http://svn.apache.org/viewvc?rev=826784&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-176

Add a MessageDispatchChannel object for use in refactoring needed for this issue.  Add unit
tests for it as well.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/MessageDispatchChannelTest.cs
  (with props)

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=826784&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Mon Oct 19 19:40:29 2009
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class MessageDispatchChannel
+    {
+        private readonly Mutex mutex = new Mutex();
+        private Atomic<bool> closed = new Atomic<bool>();
+        private Atomic<bool> running = new Atomic<bool>();
+        private LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
+        
+        public MessageDispatchChannel()
+        {
+        }
+
+        #region Properties
+        
+        public bool Closed
+        {
+            get{ return this.closed.Value; }
+            set{ this.closed.Value = value; }
+        }
+
+        public bool Running
+        {
+            get{ return this.running.Value; }
+            set{ this.running.Value = value; }
+        }
+
+        public bool Empty
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count == 0;
+                }
+            }
+        }
+
+        public long Count
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count;
+                }
+            }
+        }
+
+        #endregion
+
+        public void Start()
+        {
+            lock(this.mutex)
+            {
+                if(!Closed)
+                {
+                    this.running.Value = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            lock(mutex)
+            {
+                this.running.Value = false;
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        public void Close()
+        {
+            lock(mutex)
+            {
+                if(!Closed)
+                {
+                    this.running.Value = false;
+                    this.closed.Value = true;
+                }          
+
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+        
+        public void Enqueue(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddLast(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public void EnqueueFirst(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddFirst(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public MessageDispatch Dequeue(TimeSpan timeout)
+        {
+            lock(this.mutex)
+            {
+                // Wait until the channel is ready to deliver messages.
+                while( timeout != TimeSpan.Zero && !Closed && ( Empty ||
!Running ) )
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                    break;
+                }
+        
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+        
+                return DequeueNoWait();                      
+            }
+        }
+
+        public MessageDispatch DequeueNoWait()
+        {
+            MessageDispatch result = null;
+            
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                result = channel.First.Value;
+                this.channel.RemoveFirst();
+            }
+
+            return result;
+        }
+
+        public MessageDispatch Peek()
+        {
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                return channel.First.Value;
+            }
+        }
+
+        public void Clear()
+        {
+            lock(mutex)
+            {
+                this.channel.Clear();
+            }
+        }
+
+        public MessageDispatch[] RemoveAll()
+        {
+            MessageDispatch[] result;
+            
+            lock(mutex)
+            {
+                result = new MessageDispatch[this.Count];
+                channel.CopyTo(result, 0);
+                channel.Clear();
+            }
+
+            return result;
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/MessageDispatchChannelTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/MessageDispatchChannelTest.cs?rev=826784&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/MessageDispatchChannelTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/MessageDispatchChannelTest.cs
Mon Oct 19 19:40:29 2009
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    public class MessageDispatchChannelTest
+    {
+        [Test]
+        public void TestCtor()
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            Assert.IsTrue( channel.Running == false );
+            Assert.IsTrue( channel.Empty == true );
+            Assert.IsTrue( channel.Count == 0 );
+            Assert.IsTrue( channel.Closed == false );
+        }
+        
+        [Test]
+        public void TestStart() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            channel.Start();
+            Assert.IsTrue( channel.Running == true );
+        }
+        
+        [Test]
+        public void TestStop() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            channel.Start();
+            Assert.IsTrue( channel.Running == true );
+            channel.Stop();
+            Assert.IsTrue( channel.Running == false );
+        }
+        
+        [Test]
+        public void TestClose() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            channel.Start();
+            Assert.IsTrue( channel.Running == true );
+            Assert.IsTrue( channel.Closed == false );
+            channel.Close();
+            Assert.IsTrue( channel.Running == false );
+            Assert.IsTrue( channel.Closed == true );
+            channel.Start();
+            Assert.IsTrue( channel.Running == false );
+            Assert.IsTrue( channel.Closed == true );
+        }
+        
+        [Test]
+        public void TestEnqueue() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            MessageDispatch dispatch1 = new MessageDispatch();
+            MessageDispatch dispatch2 = new MessageDispatch();
+        
+            Assert.IsTrue( channel.Empty == true );
+            Assert.IsTrue( channel.Count == 0 );
+        
+            channel.Enqueue( dispatch1 );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 1 );
+        
+            channel.Enqueue( dispatch2 );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 2 );
+        }
+        
+        [Test]
+        public void TestEnqueueFront() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            MessageDispatch dispatch1 = new MessageDispatch();
+            MessageDispatch dispatch2 = new MessageDispatch();
+        
+            channel.Start();
+        
+            Assert.IsTrue( channel.Empty == true );
+            Assert.IsTrue( channel.Count == 0 );
+        
+            channel.EnqueueFirst( dispatch1 );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 1 );
+        
+            channel.EnqueueFirst( dispatch2 );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 2 );
+        
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch2 );
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch1 );
+        }
+        
+        [Test]
+        public void TestPeek() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+            MessageDispatch dispatch1 = new MessageDispatch();
+            MessageDispatch dispatch2 = new MessageDispatch();
+        
+            Assert.IsTrue( channel.Empty == true );
+            Assert.IsTrue( channel.Count == 0 );
+        
+            channel.EnqueueFirst( dispatch1 );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 1 );
+        
+            channel.EnqueueFirst( dispatch2 );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 2 );
+        
+            Assert.IsTrue( channel.Peek() == null );
+        
+            channel.Start();
+        
+            Assert.IsTrue( channel.Peek() == dispatch2 );
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch2 );
+            Assert.IsTrue( channel.Peek() == dispatch1 );
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch1 );
+        }
+        
+        [Test]
+        public void TestDequeueNoWait() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+        
+            MessageDispatch dispatch1 = new MessageDispatch();
+            MessageDispatch dispatch2 = new MessageDispatch();
+            MessageDispatch dispatch3 = new MessageDispatch();
+        
+            Assert.IsTrue( channel.Running == false );
+            Assert.IsTrue( channel.DequeueNoWait() == null );
+        
+            channel.Enqueue( dispatch1 );
+            channel.Enqueue( dispatch2 );
+            channel.Enqueue( dispatch3 );
+        
+            Assert.IsTrue( channel.DequeueNoWait() == null );
+            channel.Start();
+            Assert.IsTrue( channel.Running == true );
+        
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 3 );
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch1 );
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch2 );
+            Assert.IsTrue( channel.DequeueNoWait() == dispatch3 );
+        
+            Assert.IsTrue( channel.Count == 0 );
+            Assert.IsTrue( channel.Empty == true );
+        }
+        
+        [Test]
+        public void TestDequeue() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+        
+            MessageDispatch dispatch1 = new MessageDispatch();
+            MessageDispatch dispatch2 = new MessageDispatch();
+            MessageDispatch dispatch3 = new MessageDispatch();
+        
+            channel.Start();
+            Assert.IsTrue( channel.Running == true );
+        
+            DateTime timeStarted = DateTime.Now;
+        
+            Assert.IsTrue( channel.Dequeue(TimeSpan.FromMilliseconds(1000)) == null );
+
+            DateTime timeFinished = DateTime.Now;
+
+            TimeSpan elapsed = timeFinished - timeStarted;
+            Assert.IsTrue( elapsed.TotalMilliseconds >= 999 );
+        
+            channel.Enqueue( dispatch1 );
+            channel.Enqueue( dispatch2 );
+            channel.Enqueue( dispatch3 );
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 3 );
+            Assert.IsTrue( channel.Dequeue( TimeSpan.FromMilliseconds(Timeout.Infinite) )
== dispatch1 );
+            Assert.IsTrue( channel.Dequeue( TimeSpan.Zero ) == dispatch2 );
+            Assert.IsTrue( channel.Dequeue( TimeSpan.FromMilliseconds(1000) ) == dispatch3
);
+        
+            Assert.IsTrue( channel.Count == 0 );
+            Assert.IsTrue( channel.Empty == true );
+        }
+        
+        [Test]
+        public void TestRemoveAll() 
+        {
+            MessageDispatchChannel channel = new MessageDispatchChannel();
+        
+            MessageDispatch dispatch1 = new MessageDispatch();
+            MessageDispatch dispatch2 = new MessageDispatch();
+            MessageDispatch dispatch3 = new MessageDispatch();
+        
+            channel.Enqueue( dispatch1 );
+            channel.Enqueue( dispatch2 );
+            channel.Enqueue( dispatch3 );
+        
+            channel.Start();
+            Assert.IsTrue( channel.Running == true );
+            Assert.IsTrue( channel.Empty == false );
+            Assert.IsTrue( channel.Count == 3 );
+            Assert.IsTrue( channel.RemoveAll().Length == 3 );
+            Assert.IsTrue( channel.Count == 0 );
+            Assert.IsTrue( channel.Empty == true );
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Util/MessageDispatchChannelTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message