activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r382753 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/region/policy/ command/
Date Fri, 03 Mar 2006 10:34:35 GMT
Author: jstrachan
Date: Fri Mar  3 02:34:33 2006
New Revision: 382753

URL: http://svn.apache.org/viewcvs?rev=382753&view=rev
Log:
allowed the maximum pending message count to be specified on the ActiveMQPrefetchPolicy or
the ConsumerInfo

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Mar  3 02:34:33 2006
@@ -116,18 +116,19 @@
      * Create a MessageConsumer
      * 
      * @param session
-     * @param value
      * @param dest
      * @param name
      * @param selector
      * @param prefetch
+     * @param maximumPendingMessageCount TODO
      * @param noLocal
      * @param browser
      * @param dispatchAsync
+     * @param value
      * @throws JMSException
      */
     public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination
dest,
-            String name, String selector, int prefetch, boolean noLocal, boolean browser,
boolean dispatchAsync)
+            String name, String selector, int prefetch, int maximumPendingMessageCount, boolean
noLocal, boolean browser, boolean dispatchAsync)
             throws JMSException {
         if (dest == null) {
             throw new InvalidDestinationException("Don't understand null destinations");
@@ -158,6 +159,7 @@
         this.info = new ConsumerInfo(consumerId);
         this.info.setSubcriptionName(name);
         this.info.setPrefetchSize(prefetch);
+        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
         this.info.setNoLocal(noLocal);
         this.info.setDispatchAsync(dispatchAsync);
         this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
Fri Mar  3 02:34:33 2006
@@ -33,7 +33,7 @@
     private int topicPrefetch;
     private int durableTopicPrefetch;
     private int inputStreamPrefetch;
-
+    private int maximumPendingMessageLimit;
 
     /**
      * Initialize default prefetch policies
@@ -101,6 +101,19 @@
     public void setTopicPrefetch(int topicPrefetch) {
         this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
     }
+    
+    public int getMaximumPendingMessageLimit() {
+        return maximumPendingMessageLimit;
+    }
+
+    /**
+     * Sets how many messages a broker will keep around, above the prefetch limit, for non-durable
+     * topics before starting to discard older messages.
+     */
+    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
+        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
+    }
+
     
     private int getMaxPrefetchLimit(int value) {
         int result = Math.min(value, MAX_PREFETCH_SIZE);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
Fri Mar  3 02:34:33 2006
@@ -97,7 +97,9 @@
      */
     private ActiveMQMessageConsumer createConsumer() throws JMSException {
         browseDone.set(false);
-        return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector,
session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true, dispatchAsync)
{
+        ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
+        return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector,
prefetchPolicy.getQueueBrowserPrefetch(), 
+                prefetchPolicy.getMaximumPendingMessageLimit(), false, true, dispatchAsync)
{
             public void dispatch(MessageDispatch md) {
                 if( md.getMessage()==null ) {
                     browseDone.set(true);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
Fri Mar  3 02:34:33 2006
@@ -67,9 +67,9 @@
      * @throws JMSException
      */
     protected ActiveMQQueueReceiver(ActiveMQSession theSession,
-                                    ConsumerId consumerId, ActiveMQDestination destination,
String selector, int prefetch, boolean asyncDispatch)
+                                    ConsumerId consumerId, ActiveMQDestination destination,
String selector, int prefetch, int maximumPendingMessageCount, boolean asyncDispatch)
             throws JMSException {
-        super(theSession, consumerId, destination, null, selector, prefetch, false, false,
asyncDispatch);
+        super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount,
false, false, asyncDispatch);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri Mar  3 02:34:33 2006
@@ -802,14 +802,16 @@
         checkClosed();
         int prefetch = 0;
 
+        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
         if (destination instanceof Topic) {
-            prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
+            prefetch = prefetchPolicy.getTopicPrefetch();
         } else {
-            prefetch = connection.getPrefetchPolicy().getQueuePrefetch();
+            prefetch = prefetchPolicy.getQueuePrefetch();
         }
 
-        return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation
-                .transformDestination(destination), null, messageSelector, prefetch, false,
false, asyncDispatch);
+        return new ActiveMQMessageConsumer(this, getNextConsumerId(), 
+                ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector,
prefetch, 
+                prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch);
     }
 
     /**
@@ -870,9 +872,10 @@
     public MessageConsumer createConsumer(Destination destination, String messageSelector,
boolean NoLocal)
             throws JMSException {
         checkClosed();
-        return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation
-                .transformDestination(destination), null, messageSelector, connection.getPrefetchPolicy()
-                .getTopicPrefetch(), NoLocal, false, asyncDispatch);
+        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
+        return new ActiveMQMessageConsumer(this, getNextConsumerId(), 
+                ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector,

+                prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(),
NoLocal, false, asyncDispatch);
     }
 
     /**
@@ -1033,9 +1036,10 @@
             throws JMSException {
         checkClosed();
         connection.checkClientIDWasManuallySpecified();
+        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
         return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
-                .transformDestination(topic), name, messageSelector, this.connection.getPrefetchPolicy()
-                .getDurableTopicPrefetch(), noLocal, false, asyncDispatch);
+                .transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(),

+                prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
     }
 
     /**
@@ -1155,8 +1159,10 @@
      */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
         checkClosed();
+        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
         return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation
-                .transformDestination(queue), messageSelector, this.connection.getPrefetchPolicy().getQueuePrefetch(),
asyncDispatch);
+                .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),

+                prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
     }
 
     /**
@@ -1247,9 +1253,10 @@
      */
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean
noLocal) throws JMSException {
         checkClosed();
+        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
         return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
-                .transformDestination(topic), null, messageSelector, this.connection.getPrefetchPolicy()
-                .getTopicPrefetch(), noLocal, false, asyncDispatch);
+                .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(),

+                prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
Fri Mar  3 02:34:33 2006
@@ -112,9 +112,9 @@
      * @throws JMSException
      */
     protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
-                                      ConsumerId consumerId, ActiveMQDestination dest, String
name, String selector, int prefetch,
+                                      ConsumerId consumerId, ActiveMQDestination dest, String
name, String selector, int prefetch, int maximumPendingMessageCount,
                                       boolean noLocalValue, boolean browserValue, boolean
asyncDispatch) throws JMSException {
-        super(theSession, consumerId, dest, name, selector, prefetch, noLocalValue, browserValue,
asyncDispatch);
+        super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount,
noLocalValue, browserValue, asyncDispatch);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Mar  3 02:34:33 2006
@@ -20,6 +20,8 @@
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Represents an entry in a {@link PolicyMap} for assigning policies to a
@@ -31,6 +33,8 @@
  */
 public class PolicyEntry extends DestinationMapEntry {
 
+    private static final Log log = LogFactory.getLog(PolicyEntry.class);
+    
     private DispatchPolicy dispatchPolicy;
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
@@ -64,7 +68,16 @@
     public void configure(TopicSubscription subscription) {
         if (pendingMessageLimitStrategy != null) {
             int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
+            int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
+            if (consumerLimit > 0) {
+                if (value < 0 || consumerLimit < value) {
+                    value = consumerLimit;
+                }
+            }
             if (value >= 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Setting the maximumPendingMessages size to: " + value + "
for consumer: " + subscription.getInfo().getConsumerId());
+                }
                 subscription.setMaximumPendingMessages(value);
             }
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=382753&r1=382752&r2=382753&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java
Fri Mar  3 02:34:33 2006
@@ -38,6 +38,7 @@
     protected ConsumerId consumerId;
     protected ActiveMQDestination destination;
     protected int prefetchSize;
+    protected int maximumPendingMessageLimit;
     protected boolean browser;
     protected boolean dispatchAsync;
     protected String selector;
@@ -73,6 +74,7 @@
         info.consumerId = consumerId;
         info.destination = destination;
         info.prefetchSize = prefetchSize;
+        info.maximumPendingMessageLimit = maximumPendingMessageLimit;
         info.browser = browser;
         info.dispatchAsync = dispatchAsync;
         info.selector = selector;
@@ -141,6 +143,20 @@
 
     public void setPrefetchSize(int prefetchSize) {
         this.prefetchSize = prefetchSize;
+    }
+
+    /**
+     * How many messages a broker will keep around, above the prefetch limit, for non-durable
+     * topics before starting to discard older messages.
+     * 
+     * @openwire:property version=1
+     */
+    public int getMaximumPendingMessageLimit() {
+        return maximumPendingMessageLimit;
+    }
+
+    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
+        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
     }
 
     /**



Mime
View raw message