activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r831404 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs ConnectionFactory.cs PrefetchPolicy.cs Session.cs
Date Fri, 30 Oct 2009 17:58:53 GMT
Author: tabish
Date: Fri Oct 30 17:58:53 2009
New Revision: 831404

URL: http://svn.apache.org/viewvc?rev=831404&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-203

Added PrefetchPolicy for use in configuration of the Prefetch limits of Consumers created
on a Connection.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/PrefetchPolicy.cs 
 (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=831404&r1=831403&r2=831404&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Oct
30 17:58:53 2009
@@ -56,6 +56,7 @@
         private ConnectionMetaData metaData = null;
         private bool disposed = false;
         private IRedeliveryPolicy redeliveryPolicy;
+        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
 
         public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
         {
@@ -242,6 +243,12 @@
             get { return this.redeliveryPolicy; }
             set { this.redeliveryPolicy = value; }
         }
+
+        public PrefetchPolicy PrefetchPolicy
+        {
+            get { return this.prefetchPolicy; }
+            set { this.prefetchPolicy = value; }
+        }
         
         #endregion
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=831404&r1=831403&r2=831404&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
Fri Oct 30 17:58:53 2009
@@ -37,7 +37,9 @@
 		private string connectionUserName;
 		private string connectionPassword;
 		private string clientId;
+        
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
 
 		static ConnectionFactory()
 		{
@@ -102,7 +104,9 @@
 			URISupport.CompositeData c = URISupport.parseComposite(uri);
 			URISupport.SetProperties(connection, c.Parameters, "connection.");
 
-            connection.RedeliveryPolicy = this.RedeliveryPolicy;
+            connection.RedeliveryPolicy = this.redeliveryPolicy;
+            connection.PrefetchPolicy = this.prefetchPolicy;
+            
 			connection.ITransport.Start();
 			return connection;
 		}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/PrefetchPolicy.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/PrefetchPolicy.cs?rev=831404&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/PrefetchPolicy.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/PrefetchPolicy.cs Fri
Oct 30 17:58:53 2009
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// Class used to define the various limits that should be used for the Prefetch
+    /// limit on destination based on the type of Destination in use.
+    /// </summary>
+    public class PrefetchPolicy
+    {
+        public const int MAX_PREFETCH_SIZE = Int16.MaxValue - 1;
+        public const int DEFAULT_QUEUE_PREFETCH = 1000;
+        public const int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
+        public const int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
+        public const int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
+        
+        private int queuePrefetch;
+        private int queueBrowserPrefetch;
+        private int topicPrefetch;
+        private int durableTopicPrefetch;
+        private int maximumPendingMessageLimit;
+        
+        public PrefetchPolicy()
+        {
+            this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+            this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+            this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+            this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+        }
+
+        public int QueuePrefetch
+        {
+            get { return this.queuePrefetch; }
+            set { this.queuePrefetch = RestrictToMaximum(value); }
+        }
+
+        public int QueueBrowserPrefetch
+        {
+            get { return this.queueBrowserPrefetch; }
+            set { this.queueBrowserPrefetch = RestrictToMaximum(value); }
+        }
+
+        public int TopicPrefetch
+        {
+            get { return this.topicPrefetch; }
+            set { this.topicPrefetch = RestrictToMaximum(value); }
+        }
+
+        public int DurableTopicPrefetch
+        {
+            get { return this.durableTopicPrefetch; }
+            set { this.durableTopicPrefetch = RestrictToMaximum(value); }
+        }
+
+        public int MaximumPendingMessageLimit
+        {
+            get { return this.maximumPendingMessageLimit; }
+            set { this.maximumPendingMessageLimit = value; }
+        }
+        
+        public void SetAll(int value)
+        {
+            this.queuePrefetch = value;
+            this.queueBrowserPrefetch = value;
+            this.topicPrefetch = value;
+            this.durableTopicPrefetch = value;
+        }
+
+        private int RestrictToMaximum(int value)
+        {
+            return System.Math.Min(value, MAX_PREFETCH_SIZE);
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/PrefetchPolicy.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=831404&r1=831403&r2=831404&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Oct
30 17:58:53 2009
@@ -41,8 +41,6 @@
         private TransactionContext transactionContext;
         private Connection connection;
 
-        private int prefetchSize;
-        private int maximumPendingMessageLimit;
         private bool dispatchAsync;
         private bool exclusive;
         private bool retroactive;
@@ -65,7 +63,6 @@
             this.info = info;
             this.acknowledgementMode = acknowledgementMode;
             this.requestTimeout = connection.RequestTimeout;
-            this.PrefetchSize = 1000;
 
             if(acknowledgementMode == AcknowledgementMode.Transactional)
             {
@@ -88,8 +85,7 @@
         /// </summary>
         public int PrefetchSize
         {
-            get{ return this.prefetchSize; }
-            set{ this.prefetchSize = value; }
+            set{ this.connection.PrefetchPolicy.SetAll(value); }
         }
 
         /// <summary>
@@ -100,8 +96,7 @@
         /// </summary>
         public int MaximumPendingMessageLimit
         {
-            get{ return this.maximumPendingMessageLimit; }
-            set{ this.maximumPendingMessageLimit = value; }
+            set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; }
         }
 
         /// <summary>
@@ -428,6 +423,7 @@
             ConsumerId consumerId = command.ConsumerId;
             command.SubscriptionName = name;
             command.NoLocal = noLocal;
+            command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
             MessageConsumer consumer = null;
 
             // Registered with Connection before we register at the broker.
@@ -687,13 +683,21 @@
             answer.ConsumerId = id;
             answer.Destination = ActiveMQDestination.Transform(destination);
             answer.Selector = selector;
-            answer.PrefetchSize = this.PrefetchSize;
             answer.Priority = this.Priority;
             answer.Exclusive = this.Exclusive;
             answer.DispatchAsync = this.DispatchAsync;
             answer.Retroactive = this.Retroactive;
-            answer.MaximumPendingMessageLimit = this.MaximumPendingMessageLimit;
+            answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
 
+            if(destination is ITopic || destination is ITemporaryTopic)
+            {
+                answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+            }
+            else if(destination is IQueue || destination is ITemporaryQueue)
+            {
+                answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+            }
+            
             // If the destination contained a URI query, then use it to set public properties
             // on the ConsumerInfo
             ActiveMQDestination amqDestination = destination as ActiveMQDestination;



Mime
View raw message