activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r659203 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ main/java/o...
Date Thu, 22 May 2008 18:56:04 GMT
Author: rajdavies
Date: Thu May 22 11:56:03 2008
New Revision: 659203

URL: http://svn.apache.org/viewvc?rev=659203&view=rev
Log:
Fix for:
https://issues.apache.org/activemq/browse/AMQ-1704
https://issues.apache.org/activemq/browse/AMQ-1679
https://issues.apache.org/activemq/browse/AMQ-609

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Thu May 22 11:56:03 2008
@@ -16,12 +16,10 @@
  */
 package org.apache.activemq.advisory;
 
-import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
@@ -38,10 +36,12 @@
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.commons.logging.Log;
@@ -72,7 +72,7 @@
     }
 
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
-        next.addConnection(context, info);
+        super.addConnection(context, info);
 
         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
         fireAdvisory(context, topic, info);
@@ -80,7 +80,7 @@
     }
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        Subscription answer = next.addConsumer(context, info);
+        Subscription answer = super.addConsumer(context, info);
         
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
@@ -133,7 +133,7 @@
     }
 
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
-        next.addProducer(context, info);
+        super.addProducer(context, info);
 
         // Don't advise advisory topics.
         if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
@@ -144,7 +144,7 @@
     }
 
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        Destination answer = next.addDestination(context, destination);
+        Destination answer = super.addDestination(context, destination);
         if (!AdvisorySupport.isAdvisoryTopic(destination)) {
             DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
             DestinationInfo previous = destinations.putIfAbsent(destination, info);
@@ -170,7 +170,7 @@
     }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
-        next.removeDestination(context, destination, timeout);
+        super.removeDestination(context, destination, timeout);
         DestinationInfo info = destinations.remove(destination);
         if (info != null) {
             info.setDestination(destination);
@@ -190,7 +190,7 @@
     }
 
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
-        next.removeDestinationInfo(context, destInfo);   
+        super.removeDestinationInfo(context, destInfo);   
         DestinationInfo info = destinations.remove(destInfo.getDestination());
         if (info != null) {
             info.setDestination(destInfo.getDestination());
@@ -211,7 +211,7 @@
     }
 
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
-        next.removeConnection(context, info, error);
+        super.removeConnection(context, info, error);
 
         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
         fireAdvisory(context, topic, info.createRemoveCommand());
@@ -219,7 +219,7 @@
     }
 
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        next.removeConsumer(context, info);
+        super.removeConsumer(context, info);
 
         // Don't advise advisory topics.
         ActiveMQDestination dest = info.getDestination();
@@ -233,7 +233,7 @@
     }
 
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
-        next.removeProducer(context, info);
+        super.removeProducer(context, info);
 
         // Don't advise advisory topics.
         ActiveMQDestination dest = info.getDestination();
@@ -247,14 +247,92 @@
     }
 
     public void messageExpired(ConnectionContext context, MessageReference messageReference) {
-        next.messageExpired(context, messageReference);
+        super.messageExpired(context, messageReference);
         try {
-            ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
-            fireAdvisory(context, topic, messageReference.getMessage());
+            if(!messageReference.isAdvisory()) {
+                ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
+                Message payload = messageReference.getMessage().copy();
+                payload.clearBody();
+                fireAdvisory(context, topic,payload);
+            }
         } catch (Exception e) {
             LOG.warn("Failed to fire message expired advisory");
         }
     }
+    
+    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
+        super.messageConsumed(context, messageReference);
+        try {
+            if(!messageReference.isAdvisory()) {
+                ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
+                Message payload = messageReference.getMessage().copy();
+                payload.clearBody();
+                fireAdvisory(context, topic,payload);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message consumed advisory");
+        }
+    }
+    
+    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
+        super.messageDelivered(context, messageReference);
+        try {
+            if (!messageReference.isAdvisory()) {
+                ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
+                Message payload = messageReference.getMessage().copy();
+                payload.clearBody();
+                fireAdvisory(context, topic,payload);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message delivered advisory");
+        }
+    }
+    
+    public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
+        super.messageDiscarded(context, messageReference);
+        try {
+            if (!messageReference.isAdvisory()) {
+                ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
+                Message payload = messageReference.getMessage().copy();
+                payload.clearBody();
+                fireAdvisory(context, topic,payload);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message discarded advisory");
+        }
+    }
+    
+    public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
+        super.slowConsumer(context, destination,subs);
+        try {
+            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
+            fireAdvisory(context, topic,subs.getConsumerInfo());
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message slow consumer advisory");
+        }
+    }
+    
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+        super.fastProducer(context, producerInfo);
+        try {
+            ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+            fireAdvisory(context, topic,producerInfo);
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message fast producer advisory");
+        }
+    }
+    
+    public void isFull(ConnectionContext context,Destination destination,Usage usage) {
+        super.isFull(context,destination, usage);
+        try {
+            ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
+            ActiveMQMessage advisoryMessage = new ActiveMQMessage();           
+            advisoryMessage.setStringProperty("usageName", usage.getName());
+            fireAdvisory(context, topic,advisoryMessage);
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message is full advisory");
+        }
+    }
 
     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
         fireAdvisory(context, topic, command, null);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Thu May 22 11:56:03 2008
@@ -39,6 +39,12 @@
     public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
     public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
     public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+    public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+    public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastConsumer.";
+    public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+    public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+    public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+    public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
@@ -95,6 +101,48 @@
         String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
         return new ActiveMQTopic(name);
     }
+    
+    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        String name = SLOW_CONSUMER_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+        String name = FAST_PRODUCER_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DISCAREDED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DELIVERED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_CONSUMED_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
+    
+    public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
+        String name = FULL_TOPIC_PREFIX
+                + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
 
     public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
         switch (destination.getDestinationType()) {
@@ -181,6 +229,90 @@
             return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
         }
     }
+    
+    public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+        }
+    }
+    
+    public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isFullAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
+        }
+    }
 
     /**
      * Returns the agent topic which is used to send commands to the broker

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu May 22 11:56:03 2008
@@ -19,8 +19,10 @@
 import java.net.URI;
 import java.util.Set;
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -31,6 +33,7 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.usage.Usage;
 
 /**
  * The Message Broker which routes messages, maintains subscriptions and
@@ -317,5 +320,51 @@
      * @return the broker sequence id
      */
     long getBrokerSequenceId();
+    
+    /**
+     * called when message is consumed
+     * @param context
+     * @param messageReference
+     */
+    void messageConsumed(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * Called when message is delivered to the broker
+     * @param context
+     * @param messageReference
+     */
+    void messageDelivered(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * Called when a message is discarded - e.g. running low on memory
+     * This will happen only if the policy is enabled - e.g. non durable topics
+     * @param context
+     * @param messageReference
+     */
+    void messageDiscarded(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * Called when there is a slow consumer
+     * @param context
+     * @param destination 
+     * @param subs
+     */
+    void slowConsumer(ConnectionContext context,Destination destination, Subscription subs);
+    
+    /**
+     * Called to notify a producer is too fast
+     * @param context
+     * @param producerInfo
+     */
+    void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
+    
+    /**
+     * Called when a Usage reaches a limit
+     * @param context
+     * @param destination 
+     * @param usage
+     */
+    void isFull(ConnectionContext context,Destination destination,Usage usage);
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu May 22 11:56:03 2008
@@ -39,6 +39,7 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.usage.Usage;
 
 /**
  * Allows you to intercept broker operation so that features such as security
@@ -264,4 +265,29 @@
     public long getBrokerSequenceId() {
         return next.getBrokerSequenceId();
     }
+
+   
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+        next.fastProducer(context, producerInfo);
+    }
+
+    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
+        next.isFull(context,destination, usage);
+    }
+
+    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+        next.messageConsumed(context, messageReference);
+    }
+
+    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+        next.messageDelivered(context, messageReference);
+    }
+
+    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+        next.messageDiscarded(context, messageReference);
+    }
+
+    public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
+        next.slowConsumer(context, destination,subs);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu May 22 11:56:03 2008
@@ -40,6 +40,7 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.usage.Usage;
 
 /**
  * Dumb implementation - used to be overriden by listeners
@@ -256,4 +257,22 @@
     public long getBrokerSequenceId() {
         return -1l;
     }
+    
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+    }
+
+    public void isFull(ConnectionContext context, Destination destination,Usage usage) {
+    }
+
+    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) {
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu May 22 11:56:03 2008
@@ -40,6 +40,7 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.usage.Usage;
 
 /**
  * Implementation of the broker where all it's methods throw an
@@ -267,4 +268,28 @@
     public long getBrokerSequenceId() {
         throw new BrokerStoppedException(this.message);
     }
+    
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
+        throw new BrokerStoppedException(this.message);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu May 22 11:56:03 2008
@@ -41,6 +41,7 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.usage.Usage;
 
 /**
  * Like a BrokerFilter but it allows you to switch the getNext().broker. This
@@ -277,5 +278,29 @@
     public long getBrokerSequenceId() {
         return getNext().getBrokerSequenceId();
     }
+    
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+        getNext().fastProducer(context, producerInfo);
+    }
+
+    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
+        getNext().isFull(context,destination, usage);
+    }
+
+    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+        getNext().messageConsumed(context, messageReference);
+    }
+
+    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+        getNext().messageDelivered(context, messageReference);
+    }
+
+    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+        getNext().messageDiscarded(context, messageReference);
+    }
+
+    public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
+        getNext().slowConsumer(context, dest,subs);
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Thu May 22 11:56:03 2008
@@ -190,6 +190,23 @@
         }
         return Integer.MAX_VALUE;
     }
+    
+    /**
+     * Add a destination
+     * @param destination
+     */
+    public void addDestination(Destination destination) {
+        
+    }
+    
+    
+    /**
+     * Remove a destination
+     * @param destination
+     */
+    public void removeDestination(Destination destination) {
+        
+    }
 
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         add(message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu May 22 11:56:03 2008
@@ -24,12 +24,17 @@
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.Usage;
 
 
 /**
  * @version $Revision: 1.12 $
  */
 public abstract class BaseDestination implements Destination {
+    /**
+     * The default number of messages to page in to the destination
+     * from persistent storage
+     */
     public static final int DEFAULT_PAGE_SIZE=100;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
@@ -44,6 +49,12 @@
     private boolean useCache=true;
     private int minimumMessageSize=1024;
     private boolean lazyDispatch=false;
+    private boolean advisoryForSlowConsumers;
+    private boolean advisdoryForFastProducers;
+    private boolean advisoryForDiscardingMessages;
+    private boolean advisoryWhenFull;
+    private boolean advisoryForDelivery;
+    private boolean advisoryForConsumed;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
     protected final Broker regionBroker;
@@ -200,4 +211,157 @@
     protected long getDestinationSequenceId() {
         return regionBroker.getBrokerSequenceId();
     }
+
+    /**
+     * @return the advisoryForSlowConsumers
+     */
+    public boolean isAdvisoryForSlowConsumers() {
+        return advisoryForSlowConsumers;
+    }
+
+    /**
+     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
+     */
+    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
+        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
+    }
+
+    /**
+     * @return the advisoryForDiscardingMessages
+     */
+    public boolean isAdvisoryForDiscardingMessages() {
+        return advisoryForDiscardingMessages;
+    }
+
+    /**
+     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
+     */
+    public void setAdvisoryForDiscardingMessages(
+            boolean advisoryForDiscardingMessages) {
+        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
+    }
+
+    /**
+     * @return the advisoryWhenFull
+     */
+    public boolean isAdvisoryWhenFull() {
+        return advisoryWhenFull;
+    }
+
+    /**
+     * @param advisoryWhenFull the advisoryWhenFull to set
+     */
+    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
+        this.advisoryWhenFull = advisoryWhenFull;
+    }
+
+    /**
+     * @return the advisoryForDelivery
+     */
+    public boolean isAdvisoryForDelivery() {
+        return advisoryForDelivery;
+    }
+
+    /**
+     * @param advisoryForDelivery the advisoryForDelivery to set
+     */
+    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
+        this.advisoryForDelivery = advisoryForDelivery;
+    }
+
+    /**
+     * @return the advisoryForConsumed
+     */
+    public boolean isAdvisoryForConsumed() {
+        return advisoryForConsumed;
+    }
+
+    /**
+     * @param advisoryForConsumed the advisoryForConsumed to set
+     */
+    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
+        this.advisoryForConsumed = advisoryForConsumed;
+    }
+
+    /**
+     * @return the advisdoryForFastProducers
+     */
+    public boolean isAdvisdoryForFastProducers() {
+        return advisdoryForFastProducers;
+    }
+
+    /**
+     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+     */
+    public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
+        this.advisdoryForFastProducers = advisdoryForFastProducers;
+    }
+    
+    /**
+     * called when message is consumed
+     * @param context
+     * @param messageReference
+     */
+    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
+        if (advisoryForConsumed) {
+            broker.messageConsumed(context, messageReference);
+        }
+    }
+    
+    /**
+     * Called when message is delivered to the broker
+     * @param context
+     * @param messageReference
+     */
+    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
+        if(advisoryForDelivery) {
+            broker.messageDelivered(context, messageReference);
+        }
+    }
+    
+    /**
+     * Called when a message is discarded - e.g. running low on memory
+     * This will happen only if the policy is enabled - e.g. non durable topics
+     * @param context
+     * @param messageReference
+     */
+    public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
+        if (advisoryForDiscardingMessages) {
+            broker.messageDiscarded(context, messageReference);
+        }
+    }
+    
+    /**
+     * Called when there is a slow consumer
+     * @param context
+     * @param subs
+     */
+    public void slowConsumer(ConnectionContext context, Subscription subs) {
+        if(advisoryForSlowConsumers) {
+            broker.slowConsumer(context, this, subs);
+        }
+    }
+    
+    /**
+     * Called to notify a producer is too fast
+     * @param context
+     * @param producerInfo
+     */
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+        if(advisdoryForFastProducers) {
+            broker.fastProducer(context, producerInfo);
+        }
+    }
+    
+    /**
+     * Called when a Usage reaches a limit
+     * @param context
+     * @param usage
+     */
+    public void isFull(ConnectionContext context,Usage usage) {
+        if(advisoryWhenFull) {
+            broker.isFull(context,this, usage);
+        }
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Thu May 22 11:56:03 2008
@@ -29,6 +29,7 @@
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.Usage;
 
 /**
  * @version $Revision: 1.12 $
@@ -114,4 +115,48 @@
     public void setLazyDispatch(boolean value);
 
     void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node);
+
+    /**
+     * called when message is consumed
+     * @param context
+     * @param messageReference
+     */
+     void messageConsumed(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * Called when message is delivered to the broker
+     * @param context
+     * @param messageReference
+     */
+     void messageDelivered(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * Called when a message is discarded - e.g. running low on memory
+     * This will happen only if the policy is enabled - e.g. non durable topics
+     * @param context
+     * @param messageReference
+     */
+     void messageDiscarded(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * Called when there is a slow consumer
+     * @param context
+     * @param subs
+     */
+     void slowConsumer(ConnectionContext context, Subscription subs);
+    
+    /**
+     * Called to notify a producer is too fast
+     * @param context
+     * @param producerInfo
+     */
+     void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
+    
+    /**
+     * Called when a Usage reaches a limit
+     * @param context
+     * @param usage
+     */
+     void isFull(ConnectionContext context,Usage usage);
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Thu May 22 11:56:03 2008
@@ -29,6 +29,7 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.Usage;
 
 /**
  * 
@@ -208,4 +209,33 @@
 	public boolean iterate() {
 		return next.iterate();
 	}
+
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+       next.fastProducer(context, producerInfo);       
+    }
+
+   
+    public void isFull(ConnectionContext context, Usage usage) {
+       next.isFull(context, usage);
+    }
+
+   
+    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+        next.messageConsumed(context, messageReference);
+    }
+
+    
+    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+        next.messageDelivered(context, messageReference);
+    }
+
+    
+    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+        next.messageDiscarded(context, messageReference);
+    }
+
+    
+    public void slowConsumer(ConnectionContext context, Subscription subs) {
+       next.slowConsumer(context, subs);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Thu May 22 11:56:03 2008
@@ -157,4 +157,8 @@
     public synchronized int getSize() {
        return message.getSize();
     }
+
+    public boolean isAdvisory() {
+       return message.isAdvisory();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Thu May 22 11:56:03 2008
@@ -61,4 +61,9 @@
      */
     boolean isDropped();
     
+    /**
+     * @return true if the message is an advisory
+     */
+    boolean isAdvisory();
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Thu May 22 11:56:03 2008
@@ -123,4 +123,8 @@
         throw new RuntimeException("not implemented");
     }
 
+    public boolean isAdvisory() {
+        return false;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu May 22 11:56:03 2008
@@ -65,6 +65,7 @@
     private final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+    private boolean slowConsumer;
 
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker,context, info);
@@ -499,6 +500,7 @@
                 try {
                     int numberToDispatch = countBeforeFull();
                     if (numberToDispatch > 0) {
+                        slowConsumer=false;
                         pending.setMaxBatchSize(numberToDispatch);
                         int count = 0;
                         pending.reset();
@@ -525,6 +527,16 @@
                                 count++;
                             }
                         }
+                    }else {
+                        if (!slowConsumer) {
+                            slowConsumer=true;
+                            ConnectionContext c = new ConnectionContext();
+                            c.setBroker(context.getBroker());
+                            for (Destination dest :destinations) {
+                                dest.slowConsumer(c,this);
+                            }
+                            
+                        }
                     }
                 } finally {
                     pending.release();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu May 22 11:56:03 2008
@@ -337,79 +337,83 @@
             }
             return;
         }
-        if (isProducerFlowControl() && context.isProducerFlowControl() && memoryUsage.isFull()) {
-            if (systemUsage.isSendFailIfNoSpace()) {
-                throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
-            }
-
-            // We can avoid blocking due to low usage if the producer is sending
-            // a sync message or
-            // if it is using a producer window
-            if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
-                synchronized (messagesWaitingForSpace) {
-                    messagesWaitingForSpace.add(new Runnable() {
-                        public void run() {
-
-                            try {
-
-                                // While waiting for space to free up... the
-                                // message may have expired.
-                                if (broker.isExpired(message)) {
-                                    broker.messageExpired(context, message);
-                                    //message not added to stats yet
-                                    //destinationStatistics.getMessages().decrement();
-                                } else {
-                                    doMessageSend(producerExchange, message);
-                                }
-
-                                if (sendProducerAck) {
-                                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
-                                    context.getConnection().dispatchAsync(ack);
-                                } else {
-                                    Response response = new Response();
-                                    response.setCorrelationId(message.getCommandId());
-                                    context.getConnection().dispatchAsync(response);
-                                }
-
-                            } catch (Exception e) {
-                                if (!sendProducerAck && !context.isInRecoveryMode()) {
-                                    ExceptionResponse response = new ExceptionResponse(e);
-                                    response.setCorrelationId(message.getCommandId());
-                                    context.getConnection().dispatchAsync(response);
+        if(memoryUsage.isFull()) {
+            isFull(context, memoryUsage);
+            fastProducer(context, producerInfo);
+            if (isProducerFlowControl() && context.isProducerFlowControl()) {
+                if (systemUsage.isSendFailIfNoSpace()) {
+                    throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
+                }
+    
+                // We can avoid blocking due to low usage if the producer is sending
+                // a sync message or
+                // if it is using a producer window
+                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
+                    synchronized (messagesWaitingForSpace) {
+                        messagesWaitingForSpace.add(new Runnable() {
+                            public void run() {
+    
+                                try {
+    
+                                    // While waiting for space to free up... the
+                                    // message may have expired.
+                                    if (broker.isExpired(message)) {
+                                        broker.messageExpired(context, message);
+                                        //message not added to stats yet
+                                        //destinationStatistics.getMessages().decrement();
+                                    } else {
+                                        doMessageSend(producerExchange, message);
+                                    }
+    
+                                    if (sendProducerAck) {
+                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
+                                        context.getConnection().dispatchAsync(ack);
+                                    } else {
+                                        Response response = new Response();
+                                        response.setCorrelationId(message.getCommandId());
+                                        context.getConnection().dispatchAsync(response);
+                                    }
+    
+                                } catch (Exception e) {
+                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
+                                        ExceptionResponse response = new ExceptionResponse(e);
+                                        response.setCorrelationId(message.getCommandId());
+                                        context.getConnection().dispatchAsync(response);
+                                    }
                                 }
                             }
+                        });
+    
+                        // If the user manager is not full, then the task will not
+                        // get called..
+                        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+                            // so call it directly here.
+                            sendMessagesWaitingForSpaceTask.run();
                         }
-                    });
-
-                    // If the user manager is not full, then the task will not
-                    // get called..
-                    if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
-                        // so call it directly here.
-                        sendMessagesWaitingForSpaceTask.run();
+                        context.setDontSendReponse(true);
+                        return;
                     }
-                    context.setDontSendReponse(true);
-                    return;
-                }
-
-            } else {
-
-                // Producer flow control cannot be used, so we have do the flow
-                // control at the broker
-                // by blocking this thread until there is space available.
-                while (!memoryUsage.waitForSpace(1000)) {
-                    if (context.getStopping().get()) {
-                        throw new IOException("Connection closed, send aborted.");
+    
+                } else {
+    
+                    // Producer flow control cannot be used, so we have do the flow
+                    // control at the broker
+                    // by blocking this thread until there is space available.
+                    while (!memoryUsage.waitForSpace(1000)) {
+                        if (context.getStopping().get()) {
+                            throw new IOException("Connection closed, send aborted.");
+                        }
                     }
-                }
-
-                // The usage manager could have delayed us by the time
-                // we unblock the message could have expired..
-                if (message.isExpired()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Expired message: " + message);
+    
+                    // The usage manager could have delayed us by the time
+                    // we unblock the message could have expired..
+                    if (message.isExpired()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Expired message: " + message);
+                        }
+                        broker.getRoot().messageExpired(context, message);
+                        return;
                     }
-                    broker.getRoot().messageExpired(context, message);
-                    return;
                 }
             }
         }
@@ -485,6 +489,7 @@
 	}
     
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
+        messageConsumed(context, node);
         if (store != null && node.isPersistent()) {
             // the original ack may be a ranged ack, but we are trying to delete
             // a specific
@@ -1062,6 +1067,7 @@
         }
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
+        messageDelivered(context, msg);
         wakeup();
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu May 22 11:56:03 2008
@@ -58,6 +58,7 @@
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -678,6 +679,24 @@
         }
         getRoot().sendToDeadLetterQueue(context, node);
     }
+    
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+    }
+
+    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
+    }
+
+    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
+    }
 
     public void sendToDeadLetterQueue(ConnectionContext context,
 	        MessageReference node){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu May 22 11:56:03 2008
@@ -57,6 +57,7 @@
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tools.ant.taskdefs.condition.IsFalse;
 
 /**
  * The Topic is a destination that sends a copy of a message to every active
@@ -277,90 +278,95 @@
             return;
         }
 
-        if (isProducerFlowControl() && context.isProducerFlowControl() && memoryUsage.isFull()) {
-            if (systemUsage.isSendFailIfNoSpace()) {
-                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
-            }
-
-            // We can avoid blocking due to low usage if the producer is sending
-            // a sync message or
-            // if it is using a producer window
-            if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
-                synchronized (messagesWaitingForSpace) {
-                    messagesWaitingForSpace.add(new Runnable() {
-                        public void run() {
-                            
-                            try {
-
-                                // While waiting for space to free up... the
-                                // message may have expired.
-                                if (broker.isExpired(message)) {
-                                    broker.messageExpired(context, message);
-                                    //destinationStatistics.getEnqueues().increment();
-                                    //destinationStatistics.getMessages().decrement();
-                                } else {
-                                    doMessageSend(producerExchange, message);
-                                }
-
-                                if (sendProducerAck) {
-                                    ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
-                                    context.getConnection().dispatchAsync(ack);
-                                } else {
-                                    Response response = new Response();
-                                    response.setCorrelationId(message.getCommandId());
-                                    context.getConnection().dispatchAsync(response);
-                                }
-
-                            } catch (Exception e) {
-                                if (!sendProducerAck && !context.isInRecoveryMode()) {
-                                    ExceptionResponse response = new ExceptionResponse(e);
-                                    response.setCorrelationId(message.getCommandId());
-                                    context.getConnection().dispatchAsync(response);
+        if(memoryUsage.isFull()) {
+            isFull(context, memoryUsage);
+            fastProducer(context, producerInfo);
+            if (isProducerFlowControl() && context.isProducerFlowControl()) {
+                if (systemUsage.isSendFailIfNoSpace()) {
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
+                }
+    
+                // We can avoid blocking due to low usage if the producer is sending
+                // a sync message or
+                // if it is using a producer window
+                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
+                    synchronized (messagesWaitingForSpace) {
+                        messagesWaitingForSpace.add(new Runnable() {
+                            public void run() {
+                                
+                                try {
+    
+                                    // While waiting for space to free up... the
+                                    // message may have expired.
+                                    if (broker.isExpired(message)) {
+                                        broker.messageExpired(context, message);
+                                        //destinationStatistics.getEnqueues().increment();
+                                        //destinationStatistics.getMessages().decrement();
+                                    } else {
+                                        doMessageSend(producerExchange, message);
+                                    }
+    
+                                    if (sendProducerAck) {
+                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
+                                        context.getConnection().dispatchAsync(ack);
+                                    } else {
+                                        Response response = new Response();
+                                        response.setCorrelationId(message.getCommandId());
+                                        context.getConnection().dispatchAsync(response);
+                                    }
+    
+                                } catch (Exception e) {
+                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
+                                        ExceptionResponse response = new ExceptionResponse(e);
+                                        response.setCorrelationId(message.getCommandId());
+                                        context.getConnection().dispatchAsync(response);
+                                    }
                                 }
+                                
                             }
-                            
+                        });
+    
+                        // If the user manager is not full, then the task will not
+                        // get called..
+                        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+                            // so call it directly here.
+                            sendMessagesWaitingForSpaceTask.run();
                         }
-                    });
-
-                    // If the user manager is not full, then the task will not
-                    // get called..
-                    if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
-                        // so call it directly here.
-                        sendMessagesWaitingForSpaceTask.run();
+                        context.setDontSendReponse(true);
+                        return;
                     }
-                    context.setDontSendReponse(true);
-                    return;
-                }
-
-            } else {
-
-                // Producer flow control cannot be used, so we have do the flow
-                // control at the broker
-                // by blocking this thread until there is space available.
-                int count = 0;
-                while (!memoryUsage.waitForSpace(1000)) {
-                    if (context.getStopping().get()) {
-                        throw new IOException("Connection closed, send aborted.");
-                    }
-                    if (count > 2 && context.isInTransaction()) {
-                        count =0;
-                        int size = context.getTransaction().size();
-                        LOG.warn("Waiting for space to send  transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
+    
+                } else {
+    
+                    // Producer flow control cannot be used, so we have do the flow
+                    // control at the broker
+                    // by blocking this thread until there is space available.
+                    int count = 0;
+                    while (!memoryUsage.waitForSpace(1000)) {
+                        if (context.getStopping().get()) {
+                            throw new IOException("Connection closed, send aborted.");
+                        }
+                        if (count > 2 && context.isInTransaction()) {
+                            count =0;
+                            int size = context.getTransaction().size();
+                            LOG.warn("Waiting for space to send  transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
+                        }
                     }
-                }
-
-                // The usage manager could have delayed us by the time
-                // we unblock the message could have expired..
-                if (message.isExpired()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Expired message: " + message);
+    
+                    // The usage manager could have delayed us by the time
+                    // we unblock the message could have expired..
+                    if (message.isExpired()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Expired message: " + message);
+                        }
+                        return;
                     }
-                    return;
                 }
             }
         }
 
         doMessageSend(producerExchange, message);
+        messageDelivered(context, message);
         if (sendProducerAck) {
             ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
             context.getConnection().dispatchAsync(ack);
@@ -445,6 +451,7 @@
             SubscriptionKey key = dsub.getSubscriptionKey();
             topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
         }
+        messageConsumed(context, node);
     }
 
     public void dispose(ConnectionContext context) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Thu May 22 11:56:03 2008
@@ -61,6 +61,7 @@
     private final AtomicLong enqueueCounter = new AtomicLong(0);
     private final AtomicLong dequeueCounter = new AtomicLong(0);
     private int memoryUsageHighWaterMark = 95;
+    private boolean slowConsumer;
 
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
@@ -87,7 +88,15 @@
             // have not been dispatched (i.e. we allow the prefetch buffer to be
             // filled)
             dispatch(node);
+            slowConsumer=false;
         } else {
+          //we are slow
+            if(!slowConsumer) {
+                slowConsumer=true;
+                for (Destination dest: destinations) {
+                    dest.slowConsumer(getContext(), this);
+                }
+            }
             if (maximumPendingMessages != 0) {
                 synchronized (matchedListMutex) {
                     matched.addMessageLast(node);
@@ -432,6 +441,10 @@
         if (LOG.isDebugEnabled()) {
             LOG.debug("Discarding message " + message);
         }
+        Destination dest = message.getRegionDestination();
+        if (dest != null) {
+            dest.messageDiscarded(getContext(), message);
+        }
         broker.getRoot().sendToDeadLetterQueue(getContext(), message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu May 22 11:56:03 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
@@ -62,8 +63,15 @@
     private boolean useConsumerPriority=true;
     private boolean strictOrderDispatch=false;
     private boolean lazyDispatch=false;
+    private boolean advisoryForSlowConsumers;
+    private boolean advisdoryForFastProducers;
+    private boolean advisoryForDiscardingMessages;
+    private boolean advisoryWhenFull;
+    private boolean advisoryForDelivery;
+    private boolean advisoryForConsumed;
    
     public void configure(Broker broker,Queue queue) {
+        baseConfiguration(queue);
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
         }
@@ -78,20 +86,16 @@
             PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
             queue.setMessages(messages);
         }
-        queue.setProducerFlowControl(isProducerFlowControl());
-        queue.setEnableAudit(isEnableAudit());
-        queue.setMaxAuditDepth(getMaxQueueAuditDepth());
-        queue.setMaxProducersToAudit(getMaxProducersToAudit());
-        queue.setMaxPageSize(getMaxPageSize());
-        queue.setUseCache(isUseCache());
-        queue.setMinimumMessageSize((int) getMinimumMessageSize());
+        
         queue.setUseConsumerPriority(isUseConsumerPriority());
         queue.setStrictOrderDispatch(isStrictOrderDispatch());
         queue.setOptimizedDispatch(isOptimizedDispatch());
         queue.setLazyDispatch(isLazyDispatch());
+        
     }
 
     public void configure(Topic topic) {
+        baseConfiguration(topic);
         if (dispatchPolicy != null) {
             topic.setDispatchPolicy(dispatchPolicy);
         }
@@ -105,15 +109,24 @@
         if (memoryLimit > 0) {
             topic.getMemoryUsage().setLimit(memoryLimit);
         }
-        topic.setProducerFlowControl(isProducerFlowControl());
-        topic.setEnableAudit(isEnableAudit());
-        topic.setMaxAuditDepth(getMaxAuditDepth());
-        topic.setMaxProducersToAudit(getMaxProducersToAudit());
-        topic.setMaxPageSize(getMaxPageSize());
-        topic.setUseCache(isUseCache());
-        topic.setMinimumMessageSize((int) getMinimumMessageSize());
         topic.setLazyDispatch(isLazyDispatch());
     }
+    
+    public void baseConfiguration(BaseDestination destination) {
+        destination.setProducerFlowControl(isProducerFlowControl());
+        destination.setEnableAudit(isEnableAudit());
+        destination.setMaxAuditDepth(getMaxQueueAuditDepth());
+        destination.setMaxProducersToAudit(getMaxProducersToAudit());
+        destination.setMaxPageSize(getMaxPageSize());
+        destination.setUseCache(isUseCache());
+        destination.setMinimumMessageSize((int) getMinimumMessageSize());
+        destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
+        destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
+        destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
+        destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
+        destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
+        destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
+    }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
         if (pendingMessageLimitStrategy != null) {
@@ -415,4 +428,89 @@
         this.lazyDispatch = lazyDispatch;
     }
 
+    /**
+     * @return the advisoryForSlowConsumers
+     */
+    public boolean isAdvisoryForSlowConsumers() {
+        return advisoryForSlowConsumers;
+    }
+
+    /**
+     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
+     */
+    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
+        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
+    }
+
+    /**
+     * @return the advisoryForDiscardingMessages
+     */
+    public boolean isAdvisoryForDiscardingMessages() {
+        return advisoryForDiscardingMessages;
+    }
+
+    /**
+     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
+     */
+    public void setAdvisoryForDiscardingMessages(
+            boolean advisoryForDiscardingMessages) {
+        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
+    }
+
+    /**
+     * @return the advisoryWhenFull
+     */
+    public boolean isAdvisoryWhenFull() {
+        return advisoryWhenFull;
+    }
+
+    /**
+     * @param advisoryWhenFull the advisoryWhenFull to set
+     */
+    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
+        this.advisoryWhenFull = advisoryWhenFull;
+    }
+
+    /**
+     * @return the advisoryForDelivery
+     */
+    public boolean isAdvisoryForDelivery() {
+        return advisoryForDelivery;
+    }
+
+    /**
+     * @param advisoryForDelivery the advisoryForDelivery to set
+     */
+    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
+        this.advisoryForDelivery = advisoryForDelivery;
+    }
+
+    /**
+     * @return the advisoryForConsumed
+     */
+    public boolean isAdvisoryForConsumed() {
+        return advisoryForConsumed;
+    }
+
+    /**
+     * @param advisoryForConsumed the advisoryForConsumed to set
+     */
+    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
+        this.advisoryForConsumed = advisoryForConsumed;
+    }
+    
+    /**
+     * @return the advisdoryForFastProducers
+     */
+    public boolean isAdvisdoryForFastProducers() {
+        return advisdoryForFastProducers;
+    }
+
+    /**
+     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+     */
+    public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
+        this.advisdoryForFastProducers = advisdoryForFastProducers;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=659203&r1=659202&r2=659203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Thu May 22 11:56:03 2008
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import javax.jms.JMSException;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -92,6 +93,7 @@
     private BrokerId[] cluster;
 
     public abstract Message copy();
+    public abstract void clearBody() throws JMSException;
 
     protected void copy(Message copy) {
         super.copy(copy);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java?rev=659203&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java Thu May 22 11:56:03 2008
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class AdvisoryTests extends TestCase {
+    protected static final int MESSAGE_COUNT = 2000;
+    protected BrokerService broker;
+    protected Connection connection;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+    protected int topicCount;
+   
+
+    public void testNoSlowConsumerAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+            }
+        });
+        Topic advisoryTopic = AdvisorySupport
+                .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNull(msg);
+    }
+    
+    public void testSlowConsumerAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+        
+        Topic advisoryTopic = AdvisorySupport
+                .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+    
+    public void testMessageDeliveryAdvisory() throws Exception {        
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+                
+        Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+        
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+    
+    public void testMessageConsumedAdvisory() throws Exception {        
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+                
+        Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        
+        msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+    
+    public void testMessageExpiredAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = s.createQueue(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(queue);
+                
+        Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(queue);
+        producer.setTimeToLive(1);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+                
+        Message msg = advisoryConsumer.receive(2000);
+        assertNotNull(msg);
+    }
+    
+    public void xtestMessageDiscardedAdvisory() throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = s.createTopic(getClass().getName());
+        MessageConsumer consumer = s.createConsumer(topic);
+                
+        Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        //start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(topic);
+        int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
+        for (int i = 0; i < count; i++) {
+            BytesMessage m = s.createBytesMessage();
+            producer.send(m);
+        }
+                
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+   
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        ConnectionFactory factory = createConnectionFactory();
+        connection = factory.createConnection();
+        connection.start();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForConsumed(true);
+        policy.setAdvisoryForDelivery(true);
+        policy.setAdvisoryForDiscardingMessages(true);
+        policy.setAdvisoryForSlowConsumers(true);
+        policy.setAdvisoryWhenFull(true);
+        policy.setProducerFlowControl(false);
+        ConstantPendingMessageLimitStrategy strategy  = new ConstantPendingMessageLimitStrategy();
+        strategy.setLimit(10);
+        policy.setPendingMessageLimitStrategy(strategy);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        answer.setDestinationPolicy(pMap);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message