activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r916255 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/util/ test/java/org/apache/activ...
Date Thu, 25 Feb 2010 11:39:53 GMT
Author: gtully
Date: Thu Feb 25 11:39:53 2010
New Revision: 916255

URL: http://svn.apache.org/viewvc?rev=916255&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2628 - add consumerId to message dicarded
advisory to allow correlation with the dlq, change to the brokerservice advisory api

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Feb 25 11:39:53 2010
@@ -27,6 +27,7 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -309,14 +310,19 @@
     }
     
     @Override
-    public void messageDiscarded(ConnectionContext context, MessageReference messageReference)
{
-        super.messageDiscarded(context, messageReference);
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
+        super.messageDiscarded(context, sub, messageReference);
         try {
             if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
                 payload.clearBody();
-                fireAdvisory(context, topic,payload);
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                if (sub instanceof TopicSubscription) {
+                    advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT,
((TopicSubscription)sub).discarded());
+                }
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID,
sub.getConsumerInfo().getConsumerId().toString());
+                fireAdvisory(context, topic, payload, null, advisoryMessage);
             }
         } catch (Exception e) {
             LOG.warn("Failed to fire message discarded advisory");
@@ -403,7 +409,7 @@
                 count += dest.getDestinationStatistics().getConsumers().getCount();
             }
         }
-        advisoryMessage.setIntProperty("consumerCount", count);
+        advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
         
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Thu Feb 25 11:39:53 2010
@@ -55,6 +55,9 @@
     public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
     public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
     public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
+    public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
+    public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
+    
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
             TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
     private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu
Feb 25 11:39:53 2010
@@ -339,9 +339,10 @@
      * Called when a message is discarded - e.g. running low on memory
      * This will happen only if the policy is enabled - e.g. non durable topics
      * @param context
+     * @param sub 
      * @param messageReference
      */
-    void messageDiscarded(ConnectionContext context, MessageReference messageReference);
+    void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
     
     /**
      * Called when there is a slow consumer

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Thu Feb 25 11:39:53 2010
@@ -284,8 +284,8 @@
         next.messageDelivered(context, messageReference);
     }
 
-    public void messageDiscarded(ConnectionContext context,MessageReference messageReference)
{
-        next.messageDiscarded(context, messageReference);
+    public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference
messageReference) {
+        next.messageDiscarded(context, sub, messageReference);
     }
 
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription
subs) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Thu Feb 25 11:39:53 2010
@@ -271,7 +271,7 @@
     public void messageDelivered(ConnectionContext context,MessageReference messageReference)
{
     }
 
-    public void messageDiscarded(ConnectionContext context,MessageReference messageReference)
{
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
     }
 
     public void slowConsumer(ConnectionContext context,Destination destination, Subscription
subs) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Thu Feb 25 11:39:53 2010
@@ -286,7 +286,7 @@
         throw new BrokerStoppedException(this.message);
     }
 
-    public void messageDiscarded(ConnectionContext context,MessageReference messageReference)
{
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Thu Feb 25 11:39:53 2010
@@ -295,8 +295,8 @@
         getNext().messageDelivered(context, messageReference);
     }
 
-    public void messageDiscarded(ConnectionContext context,MessageReference messageReference)
{
-        getNext().messageDiscarded(context, messageReference);
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
+        getNext().messageDiscarded(context, sub, messageReference);
     }
 
     public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs)
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Feb 25 11:39:53 2010
@@ -429,9 +429,9 @@
      * @param context
      * @param messageReference
      */
-    public void messageDiscarded(ConnectionContext context, MessageReference messageReference)
{
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
         if (advisoryForDiscardingMessages) {
-            broker.messageDiscarded(context, messageReference);
+            broker.messageDiscarded(context, sub, messageReference);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Feb 25 11:39:53 2010
@@ -177,8 +177,9 @@
      * 
      * @param context
      * @param messageReference
+     * @param sub 
      */
-    void messageDiscarded(ConnectionContext context, MessageReference messageReference);
+    void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
 
     /**
      * Called when there is a slow consumer

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Feb 25 11:39:53 2010
@@ -239,8 +239,8 @@
         next.messageDelivered(context, messageReference);
     }
 
-    public void messageDiscarded(ConnectionContext context, MessageReference messageReference)
{
-        next.messageDiscarded(context, messageReference);
+    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference
messageReference) {
+        next.messageDiscarded(context, sub, messageReference);
     }
 
     public void slowConsumer(ConnectionContext context, Subscription subs) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Feb 25 11:39:53 2010
@@ -463,7 +463,7 @@
         }
         Destination dest = message.getRegionDestination();
         if (dest != null) {
-            dest.messageDiscarded(getContext(), message);
+            dest.messageDiscarded(getContext(), this, message);
         }
         broker.getRoot().sendToDeadLetterQueue(getContext(), message);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
Thu Feb 25 11:39:53 2010
@@ -579,7 +579,7 @@
     }
 
     @Override
-    public void messageDiscarded(ConnectionContext context,
+    public void messageDiscarded(ConnectionContext context,  Subscription sub,
             MessageReference messageReference) {
         if (isLogAll() || isLogInternalEvents()) {
             String msg = "Unable to display message.";
@@ -589,7 +589,7 @@
             }
             LOG.info("Message discarded : " + msg);
         }
-        super.messageDiscarded(context, messageReference);
+        super.messageDiscarded(context, sub, messageReference);
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java?rev=916255&r1=916254&r2=916255&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
Thu Feb 25 11:39:53 2010
@@ -18,6 +18,7 @@
 
 import static junit.framework.Assert.fail;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -30,6 +31,7 @@
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -37,6 +39,7 @@
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
@@ -47,6 +50,8 @@
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ThreadTracker;
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
@@ -61,7 +66,8 @@
     Connection connection;
     private Session session;
     private Topic destination;
-    protected int numMessages = 4000;
+    private final String destinationName = "verifyEvection";
+    protected int numMessages = 2000;
     protected String payload = new String(new byte[1024*2]);
 
     public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws
Exception {
@@ -71,7 +77,7 @@
         connection = connectionFactory.createConnection();
         connection.start();
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        destination = session.createTopic("verifyEvection");
+        destination = session.createTopic(destinationName);
     }
     
     @After
@@ -83,16 +89,69 @@
     
     @Test
     public void testMessageEvictionMemoryUsageFileCursor() throws Exception {
-        doTestMessageEvictionMemoryUsage(new FilePendingSubscriberMessageStoragePolicy());
+        setUp(new FilePendingSubscriberMessageStoragePolicy());
+        doTestMessageEvictionMemoryUsage();
     }
     
     @Test
     public void testMessageEvictionMemoryUsageVmCursor() throws Exception {
-        doTestMessageEvictionMemoryUsage(new VMPendingSubscriberMessageStoragePolicy());
+        setUp(new VMPendingSubscriberMessageStoragePolicy());
+        doTestMessageEvictionMemoryUsage();
     }
     
-    public void doTestMessageEvictionMemoryUsage(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy)
throws Exception {
-        setUp(pendingSubscriberPolicy);
+    @Test
+    public void testMessageEvictionDiscardedAdvisory() throws Exception {
+        setUp(new VMPendingSubscriberMessageStoragePolicy());
+        
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final CountDownLatch consumerRegistered = new CountDownLatch(1);
+        final CountDownLatch gotAdvisory = new CountDownLatch(1);
+        final CountDownLatch advisoryIsGood = new CountDownLatch(1);
+        
+        executor.execute(new Runnable() {
+            public void run() {
+                try {
+                    ActiveMQTopic discardedAdvisoryDestination = 
+                        AdvisorySupport.getMessageDiscardedAdvisoryTopic(destination);
+                    // use separate session rather than asyncDispatch on consumer session

+                    // as we want consumer session to block
+                    Session advisorySession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    final MessageConsumer consumer = advisorySession.createConsumer(discardedAdvisoryDestination);
+                    consumer.setMessageListener(new MessageListener() {
+                        public void onMessage(Message message) {
+                            try {
+                                LOG.info("advisory:" + message);
+                                ActiveMQMessage activeMQMessage = (ActiveMQMessage) message;
+                                assertNotNull(activeMQMessage.getStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID));
+                                assertEquals(1, activeMQMessage.getIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT));
+                                message.acknowledge();
+                                advisoryIsGood.countDown();
+                            } catch (JMSException e) {
+                                e.printStackTrace();
+                                fail(e.toString());
+                            } finally {
+                                gotAdvisory.countDown();
+                            }
+                        }           
+                    });
+                    consumerRegistered.countDown();
+                    gotAdvisory.await(120, TimeUnit.SECONDS);
+                    consumer.close();
+                    advisorySession.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }
+            }
+        });
+        assertTrue("we have an advisory consumer", consumerRegistered.await(60, TimeUnit.SECONDS));
+        doTestMessageEvictionMemoryUsage();
+        assertTrue("got an advisory for discarded", gotAdvisory.await(0, TimeUnit.SECONDS));
+        assertTrue("advisory is good",advisoryIsGood.await(0, TimeUnit.SECONDS));
+    }
+    
+    public void doTestMessageEvictionMemoryUsage() throws Exception {
+        
         ExecutorService executor = Executors.newCachedThreadPool();
         final CountDownLatch doAck = new CountDownLatch(1);
         final CountDownLatch consumerRegistered = new CountDownLatch(1);
@@ -147,7 +206,7 @@
             }
         });
         
-        assertTrue("messages sending done", sendDone.await(90, TimeUnit.SECONDS));
+        assertTrue("messages sending done", sendDone.await(180, TimeUnit.SECONDS));
         assertEquals("all message were sent", numMessages, sent.get());
         
         doAck.countDown();
@@ -175,6 +234,8 @@
         final PolicyEntry entry = new PolicyEntry();
         entry.setTopic(">");
         
+        entry.setAdvisoryForDiscardingMessages(true);
+        
         // so consumer does not get over run while blocked limit the prefetch
         entry.setTopicPrefetch(50);
         
@@ -204,8 +265,6 @@
         policyMap.setPolicyEntries(policyEntries);
         brokerService.setDestinationPolicy(policyMap);
         
-        brokerService.setAdvisorySupport(false);
-        
         return brokerService;
     }
 



Mime
View raw message