activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r984591 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: ./ Util/
Date Wed, 11 Aug 2010 21:35:26 GMT
Author: tabish
Date: Wed Aug 11 21:35:26 2010
New Revision: 984591

URL: http://svn.apache.org/viewvc?rev=984591&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-270

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Aug
11 21:35:26 2010
@@ -44,6 +44,7 @@ namespace Apache.NMS.ActiveMQ
         private bool sendAcksAsync = false;
         private bool dispatchAsync = true;
         private int producerWindowSize = 0;
+        private bool messagePrioritySupported=true;
 
         private bool userSpecifiedClientID;
         private readonly Uri brokerUri;
@@ -238,6 +239,17 @@ namespace Apache.NMS.ActiveMQ
             set { this.useCompression = value; }
         }
 
+        /// <summary>
+        /// Indicate whether or not the resources of this Connection should support the
+        /// Message Priority value of incoming messages and dispatch them accordingly.
+        /// When disabled Message are always dispatched to Consumers in FIFO order.
+        /// </summary>
+        public bool MessagePrioritySupported
+        {
+            get { return this.messagePrioritySupported; }
+            set { this.messagePrioritySupported = value; }
+        }
+
         public IConnectionMetaData MetaData
         {
             get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
Wed Aug 11 21:35:26 2010
@@ -53,6 +53,7 @@ namespace Apache.NMS.ActiveMQ
 		private int producerWindowSize = 0;
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+        private bool messagePrioritySupported=true;
 
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -255,7 +256,13 @@ namespace Apache.NMS.ActiveMQ
             get { return this.dispatchAsync; }
             set { this.dispatchAsync = value; }
         }
-		
+
+        public bool MessagePrioritySupported
+        {
+            get { return this.messagePrioritySupported; }
+            set { this.messagePrioritySupported = value; }
+        }
+
 		public int RequestTimeout
 		{
 			get { return (int)this.requestTimeout.TotalMilliseconds; }
@@ -359,6 +366,7 @@ namespace Apache.NMS.ActiveMQ
             connection.UseCompression = this.useCompression;
 			connection.RequestTimeout = this.requestTimeout;
 			connection.ProducerWindowSize = this.producerWindowSize;
+            connection.MessagePrioritySupported = this.messagePrioritySupported;
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
             connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
             connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Aug 11 21:35:26 2010
@@ -39,7 +39,7 @@ namespace Apache.NMS.ActiveMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
-		private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+		private readonly MessageDispatchChannel unconsumedMessages;
 		private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
 		private readonly ConsumerInfo info;
 		private Session session;
@@ -79,7 +79,16 @@ namespace Apache.NMS.ActiveMQ
             
 			this.session = session;
             this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
-			
+
+            if(session.Connection.MessagePrioritySupported)
+            {
+                this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
+            }
+            else
+            {
+                this.unconsumedMessages = new FifoMessageDispatchChannel();
+            }
+
             this.info = new ConsumerInfo();
             this.info.ConsumerId = id;
             this.info.Destination = destination;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
Wed Aug 11 21:35:26 2010
@@ -25,7 +25,7 @@ namespace Apache.NMS.ActiveMQ
 {
     public class SessionExecutor : Threads.Task
     {
-        private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
+        private MessageDispatchChannel messageQueue = null;
         private TaskRunner taskRunner = null;
 
         private Session session = null;
@@ -35,6 +35,15 @@ namespace Apache.NMS.ActiveMQ
         {
             this.session = session;
             this.consumers = consumers;
+
+            if(this.session.Connection != null && this.session.Connection.MessagePrioritySupported)
+            {
+               this.messageQueue = new SimplePriorityMessageDispatchChannel();
+            }
+            else
+            {
+                this.messageQueue = new FifoMessageDispatchChannel();
+            }
         }
 
         ~SessionExecutor()

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs?rev=984591&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
Wed Aug 11 21:35:26 2010
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// A FIFO based MessageDispatchChannel.
+    /// </summary>
+    public class FifoMessageDispatchChannel : MessageDispatchChannel
+    {
+        private readonly Mutex mutex = new Mutex();
+        private bool closed;
+        private bool running;
+        private LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
+
+        #region Properties
+
+        public object SyncRoot
+        {
+            get{ return this.mutex; }
+        }
+        
+        public bool Closed
+        {
+            get 
+            {
+                lock(this.mutex)
+                {
+                    return this.closed; 
+                }
+            }
+            
+            set 
+            {
+                lock(this.mutex)
+                {
+                    this.closed = value;
+                }
+            }
+        }
+
+        public bool Running
+        {
+            get
+            {
+                lock(this.mutex)
+                {
+                    return this.running;
+                }
+            }
+            
+            set
+            {
+                lock(this.mutex)
+                {
+                    this.running = value;
+                }
+            }
+        }
+
+        public bool Empty
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count == 0;
+                }
+            }
+        }
+
+        public long Count
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count;
+                }
+            }
+        }
+
+        #endregion
+
+        public void Start()
+        {
+            lock(this.mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            lock(mutex)
+            {
+                this.running = false;
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        public void Close()
+        {
+            lock(mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = false;
+                    this.closed = true;
+                }          
+
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+        
+        public void Enqueue(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddLast(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public void EnqueueFirst(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddFirst(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public MessageDispatch Dequeue(TimeSpan timeout)
+        {
+            lock(this.mutex)
+            {
+                // Wait until the channel is ready to deliver messages.
+                if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+        
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+        
+                return DequeueNoWait();                      
+            }
+        }
+
+        public MessageDispatch DequeueNoWait()
+        {
+            MessageDispatch result = null;
+            
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                result = channel.First.Value;
+                this.channel.RemoveFirst();
+            }
+
+            return result;
+        }
+
+        public MessageDispatch Peek()
+        {
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                return channel.First.Value;
+            }
+        }
+
+        public void Clear()
+        {
+            lock(mutex)
+            {
+                this.channel.Clear();
+            }
+        }
+
+        public MessageDispatch[] RemoveAll()
+        {
+            MessageDispatch[] result;
+            
+            lock(mutex)
+            {
+                result = new MessageDispatch[this.Count];
+                channel.CopyTo(result, 0);
+                channel.Clear();
+            }
+
+            return result;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Wed Aug 11 21:35:26 2010
@@ -22,205 +22,60 @@ using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
 {
-    public class MessageDispatchChannel
+    /// <summary>
+    /// Defines an interface for a Message Channel used to dispatch incoming
+    /// Messages to a Session or MessageConsumer.  The implementation controls
+    /// how the messages are dequeued from the channel, one option is for a
+    /// FIFO ordering while another might be to sort the Message's based on the
+    /// set Message Priority.
+    /// </summary>
+    public interface MessageDispatchChannel
     {
-        private readonly Mutex mutex = new Mutex();
-        private bool closed;
-        private bool running;
-        private LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
-        
-        #region Properties
-
-        public object SyncRoot
+        object SyncRoot
         {
-            get{ return this.mutex; }
+            get;
         }
         
-        public bool Closed
-        {
-            get 
-            {
-                lock(this.mutex)
-                {
-                    return this.closed; 
-                }
-            }
-            
-            set 
-            {
-                lock(this.mutex)
-                {
-                    this.closed = value;
-                }
-            }
-        }
-
-        public bool Running
-        {
-            get
-            {
-                lock(this.mutex)
-                {
-                    return this.running;
-                }
-            }
-            
-            set
-            {
-                lock(this.mutex)
-                {
-                    this.running = value;
-                }
-            }
-        }
-
-        public bool Empty
+        bool Closed
         {
-            get
-            {
-                lock(mutex)
-                {
-                    return channel.Count == 0;
-                }
-            }
+            get;
+            set;
         }
 
-        public long Count
+        bool Running
         {
-            get
-            {
-                lock(mutex)
-                {
-                    return channel.Count;
-                }
-            }
+            get;
+            set;
         }
 
-        #endregion
-
-        public void Start()
+        bool Empty
         {
-            lock(this.mutex)
-            {
-                if(!Closed)
-                {
-                    this.running = true;
-                    Monitor.PulseAll(this.mutex);
-                }
-            }
+            get;
         }
 
-        public void Stop()
+        long Count
         {
-            lock(mutex)
-            {
-                this.running = false;
-                Monitor.PulseAll(this.mutex);
-            }
+            get;
         }
 
-        public void Close()
-        {
-            lock(mutex)
-            {
-                if(!Closed)
-                {
-                    this.running = false;
-                    this.closed = true;
-                }          
-
-                Monitor.PulseAll(this.mutex);
-            }            
-        }
-        
-        public void Enqueue(MessageDispatch dispatch)
-        {
-            lock(this.mutex)
-            {
-                this.channel.AddLast(dispatch);
-                Monitor.Pulse(this.mutex);
-            }
-        }
+        void Start();
 
-        public void EnqueueFirst(MessageDispatch dispatch)
-        {
-            lock(this.mutex)
-            {
-                this.channel.AddFirst(dispatch);
-                Monitor.Pulse(this.mutex);
-            }
-        }
+        void Stop();
 
-        public MessageDispatch Dequeue(TimeSpan timeout)
-        {
-            lock(this.mutex)
-            {
-                // Wait until the channel is ready to deliver messages.
-                if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
-                {
-                    Monitor.Wait(this.mutex, timeout);
-                }
+        void Close();
         
-                if( Closed || !Running || Empty ) 
-                {
-                    return null;
-                }
-        
-                return DequeueNoWait();                      
-            }
-        }
+        void Enqueue(MessageDispatch dispatch);
 
-        public MessageDispatch DequeueNoWait()
-        {
-            MessageDispatch result = null;
-            
-            lock(this.mutex)
-            {
-                if( Closed || !Running || Empty ) 
-                {
-                    return null;
-                }
-                
-                result = channel.First.Value;
-                this.channel.RemoveFirst();
-            }
+        void EnqueueFirst(MessageDispatch dispatch);
 
-            return result;
-        }
+        MessageDispatch Dequeue(TimeSpan timeout);
 
-        public MessageDispatch Peek()
-        {
-            lock(this.mutex)
-            {
-                if( Closed || !Running || Empty ) 
-                {
-                    return null;
-                }
-                
-                return channel.First.Value;
-            }
-        }
+        MessageDispatch DequeueNoWait();
 
-        public void Clear()
-        {
-            lock(mutex)
-            {
-                this.channel.Clear();
-            }
-        }
+        MessageDispatch Peek();
 
-        public MessageDispatch[] RemoveAll()
-        {
-            MessageDispatch[] result;
-            
-            lock(mutex)
-            {
-                result = new MessageDispatch[this.Count];
-                channel.CopyTo(result, 0);
-                channel.Clear();
-            }
+        void Clear();
 
-            return result;
-        }
+        MessageDispatch[] RemoveAll();
     }
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=984591&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Wed Aug 11 21:35:26 2010
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class SimplePriorityMessageDispatchChannel : MessageDispatchChannel
+    {
+        public const int MAX_PRIORITY = 10;
+
+        private readonly Mutex mutex = new Mutex();
+        private bool closed;
+        private bool running;
+        private LinkedList<MessageDispatch>[] channels = new LinkedList<MessageDispatch>[MAX_PRIORITY];
+        private int size;
+
+        public SimplePriorityMessageDispatchChannel()
+        {
+            for(int i = 0; i < MAX_PRIORITY; ++i)
+            {
+                channels[i] = new LinkedList<MessageDispatch>();
+            }
+        }
+
+        #region Properties
+
+        public object SyncRoot
+        {
+            get{ return this.mutex; }
+        }
+        
+        public bool Closed
+        {
+            get 
+            {
+                lock(this.mutex)
+                {
+                    return this.closed; 
+                }
+            }
+            
+            set 
+            {
+                lock(this.mutex)
+                {
+                    this.closed = value;
+                }
+            }
+        }
+
+        public bool Running
+        {
+            get
+            {
+                lock(this.mutex)
+                {
+                    return this.running;
+                }
+            }
+            
+            set
+            {
+                lock(this.mutex)
+                {
+                    this.running = value;
+                }
+            }
+        }
+
+        public bool Empty
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return this.size == 0;
+                }
+            }
+        }
+
+        public long Count
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return this.size;
+                }
+            }
+        }
+
+        #endregion
+
+        public void Start()
+        {
+            lock(this.mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            lock(mutex)
+            {
+                this.running = false;
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        public void Close()
+        {
+            lock(mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = false;
+                    this.closed = true;
+                }          
+
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+        
+        public void Enqueue(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                GetList(dispatch).AddLast(dispatch);
+                this.size++;
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public void EnqueueFirst(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                GetList(dispatch).AddFirst(dispatch);
+                this.size++;
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public MessageDispatch Dequeue(TimeSpan timeout)
+        {
+            lock(this.mutex)
+            {
+                // Wait until the channel is ready to deliver messages.
+                if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+        
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+        
+                return RemoveFirst();
+            }
+        }
+
+        public MessageDispatch DequeueNoWait()
+        {
+            MessageDispatch result = null;
+            
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                result = RemoveFirst();
+            }
+
+            return result;
+        }
+
+        public MessageDispatch Peek()
+        {
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                return GetFirst();
+            }
+        }
+
+        public void Clear()
+        {
+            lock(mutex)
+            {
+                foreach(LinkedList<MessageDispatch> list in channels)
+                {
+                    list.Clear();
+                }
+            }
+        }
+
+        public MessageDispatch[] RemoveAll()
+        {
+            MessageDispatch[] result;
+            
+            lock(mutex)
+            {
+                result = new MessageDispatch[this.size];
+                int copyPos = 0;
+
+                for(int i = MAX_PRIORITY - 1; i >= 0; i--)
+                {
+                    LinkedList<MessageDispatch> list = channels[i];
+                    list.CopyTo(result, copyPos);
+                    copyPos += list.Count;
+                    size -= list.Count;
+                    list.Clear();
+                }
+            }
+
+            return result;
+        }
+
+        protected int getPriority(MessageDispatch message)
+        {
+            int priority = (int) NMSConstants.defaultPriority;
+
+            if(message.Message != null)
+            {
+                priority = Math.Max((int) message.Message.Priority, 0);
+                priority = Math.Min(priority, 9);
+            }
+
+            return priority;
+        }
+
+        protected LinkedList<MessageDispatch> GetList(MessageDispatch md)
+        {
+            return channels[getPriority(md)];
+        }
+
+        private MessageDispatch RemoveFirst()
+        {
+            if(this.size > 0)
+            {
+                for(int i = MAX_PRIORITY - 1; i >= 0; i--)
+                {
+                    LinkedList<MessageDispatch> list = channels[i];
+                    if(list.Count != 0)
+                    {
+                        this.size--;
+                        MessageDispatch dispatch = list.First.Value;
+                        list.RemoveFirst();
+                        return dispatch;
+                    }
+                }
+            }
+            return null;
+        }
+
+        private MessageDispatch GetFirst()
+        {
+            if(this.size > 0)
+            {
+                for(int i = MAX_PRIORITY - 1; i >= 0; i--)
+                {
+                    LinkedList<MessageDispatch> list = channels[i];
+                    if(list.Count != 0)
+                    {
+                        return list.First.Value;
+                    }
+                }
+            }
+            return null;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message