activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1301565 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/TopicSubscription.java test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
Date Fri, 16 Mar 2012 14:55:38 GMT
Author: gtully
Date: Fri Mar 16 14:55:38 2012
New Revision: 1301565

URL: http://svn.apache.org/viewvc?rev=1301565&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3733: Topic subscriber is assumed to be slow consumer
when prefetch is set to one. Thanks for the great test case. Fixed up the logic used to determine
slowness of a sub to take into account the pending messages and prefetch. It is now only applicable
when prefetch > 1 and the pending message strategy keeps messages in memory

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1301565&r1=1301564&r2=1301565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Mar 16 14:55:38 2012
@@ -99,12 +99,14 @@ public class TopicSubscription extends A
             dispatch(node);
             setSlowConsumer(false);
         } else {
-            //we are slow
-            if(!isSlowConsumer()) {
-                LOG.warn(toString() + ": has reached its prefetch limit without an ack, it
appears to be slow");
-                setSlowConsumer(true);
-                for (Destination dest: destinations) {
-                    dest.slowConsumer(getContext(), this);
+            if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()
) {
+                //we are slow
+                if(!isSlowConsumer()) {
+                    LOG.warn(toString() + ": has twice its prefetch limit pending, without
an ack; it appears to be slow");
+                    setSlowConsumer(true);
+                    for (Destination dest: destinations) {
+                        dest.slowConsumer(getContext(), this);
+                    }
                 }
             }
             if (maximumPendingMessages != 0) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java?rev=1301565&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
Fri Mar 16 14:55:38 2012
@@ -0,0 +1,111 @@
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
+
+
+/**
+ * Checks to see if "slow consumer advisory messages" are generated when 
+ * small number of messages (2) are published to a topic which has a subscriber 
+ * with a prefetch of one set.
+ * 
+ */
+
+public class TopicSubscriptionSlowConsumerTest extends TestCase {
+
+	private static final String TOPIC_NAME = "slow.consumer";
+	Connection connection;
+	private Session session;
+	private ActiveMQTopic destination;
+	private MessageProducer producer;
+	private MessageConsumer consumer;
+	private BrokerService brokerService;
+
+	
+	public void setUp() throws Exception {
+
+		brokerService = createBroker();
+		
+		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+		
+		activeMQConnectionFactory.setWatchTopicAdvisories(true);
+		connection = activeMQConnectionFactory.createConnection();
+		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		destination = new ActiveMQTopic(TOPIC_NAME);
+		producer = session.createProducer(destination);
+		
+		connection.start();
+	}
+
+	
+	
+	public void testPrefetchValueOne() throws Exception{
+		
+		ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME+"?consumer.prefetchSize=1");
+		consumer = session.createConsumer(consumerDestination);
+		
+		//add a consumer to the slow consumer advisory topic. 
+		ActiveMQTopic slowConsumerAdvisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
+		MessageConsumer slowConsumerAdvisory = session.createConsumer(slowConsumerAdvisoryTopic);
+		
+		//publish 2 messages
+		Message txtMessage = session.createTextMessage("Sample Text Message");
+		for(int i= 0; i<2; i++){
+			producer.send(txtMessage);
+		}
+		
+		//consume 2 messages
+		for(int i= 0; i<2; i++){
+			Message receivedMsg = consumer.receive(100);
+			Assert.assertNotNull("received msg "+i+" should not be null",receivedMsg);
+		}
+
+		//check for "slow consumer" advisory message
+		Message slowAdvisoryMessage = slowConsumerAdvisory.receive(100);
+		Assert.assertNull( "should not have received a slow consumer advisory message",slowAdvisoryMessage);
+		
+	}
+
+	
+
+	public void tearDown() throws Exception {
+		consumer.close();
+		producer.close();
+		session.close();
+		connection.close();
+		brokerService.stop();
+	}
+	
+	
+	//helper method to create a broker with slow consumer advisory turned on
+	private BrokerService createBroker() throws Exception {
+		BrokerService broker = new BrokerService();
+		broker.setBrokerName("localhost");
+		broker.setUseJmx(true);
+		broker.setDeleteAllMessagesOnStartup(true);
+		broker.addConnector("vm://localhost");
+
+		PolicyMap policyMap = new PolicyMap();
+		PolicyEntry defaultEntry = new PolicyEntry();
+		defaultEntry.setAdvisoryForSlowConsumers(true);
+
+		policyMap.setDefaultEntry(defaultEntry);
+
+		broker.setDestinationPolicy(policyMap);
+		broker.start();
+		broker.waitUntilStarted();
+		return broker;
+	}
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message