activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Furness (Updated) (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AMQ-3607) Setting OptimiseAcknowledge on a queue with a prefetch limit causes normal/fast consumers to miss messages when a slow consumer is blocking
Date Wed, 23 Nov 2011 18:27:40 GMT

     [ https://issues.apache.org/jira/browse/AMQ-3607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

James Furness updated AMQ-3607:
-------------------------------

    Attachment: ActiveMQSlowConsumerManualTest.java

Test case attached as requested
                
> Setting OptimiseAcknowledge on a queue with a prefetch limit causes normal/fast consumers
to miss messages when a slow consumer is blocking
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3607
>                 URL: https://issues.apache.org/jira/browse/AMQ-3607
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.0
>         Environment: Java: 1.6.0_26-b03-383.jdk
>            Reporter: James Furness
>         Attachments: ActiveMQSlowConsumerManualTest.java
>
>
> The below test case tests slow consumer handling with a variety of topic policies and
SessionFactory/ConnectionFactory settings. The expectation is that a normal (i.e. fast) consumer
will continue to receive messages whilst a slow consumer is blocking.
> Without a prefetch limit, the expected behaviour is seen with setOptimizeAcknowledge
both true and false.
> If a prefetch limit is set, setOptimizeAcknowledge(true) causes the normal/fast consumer
to miss messages whilst the slow consumer is blocking.
> Would be nice to be able to turn on OptimiseAcknowledge for performance reasons, however
it is also necessary to set the prefetch limit in order to trigger SlowConsumerStrategy/MessageEvictionStrategySupport
logic.
> {code:title=testDefaultSettings}
> Publisher: Send 0
> SlowConsumer: Receive 0
> FastConsumer: Receive 0
> testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettings: Whilst slow consumer blocked:
> 		- SlowConsumer Received: 1 [0]
> 		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettings: After slow consumer unblocked:
> 		- SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> 		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testDefaultSettingsWithOptimiseAcknowledge}
> testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6,
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
> 		- SlowConsumer Received: 1 [0]
> 		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
> 		- SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> 		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testBounded}
> testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBounded: Whilst slow consumer blocked:
> 		- SlowConsumer Received: 1 [0]
> 		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBounded: After slow consumer unblocked:
> 		- SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
> 		- FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testBoundedWithOptimiseAcknowledge}
> testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
> 		- SlowConsumer Received: 1 [0]
> 		- FastConsumer Received: 5 [0, 1, 2, 3, 4]
> testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
> 		- SlowConsumer Received: 5 [0, 1, 2, 3, 4]
> 		- FastConsumer Received: 5 [0, 1, 2, 3, 4]
> java.lang.AssertionError: Fast consumer missed messages whilst slow consumer was blocking
expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 3, 4]>
> {code}
> {code:title=ActiveMQSlowConsumerManualTest.java}
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
> import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.command.ActiveMQTopic;
> import org.junit.Assert;
> import org.junit.Ignore;
> import org.junit.Test;
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.atomic.AtomicInteger;
> /**
>  * @author James Furness
>  */
> public class ActiveMQSlowConsumerManualTest {
>     private static final int PORT = 12345;
>     private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
>     private static final String URL = "nio://localhost:" + PORT + "?socket.tcpNoDelay=true";
>     @Test(timeout = 60000)
>     public void testDefaultSettings() throws Exception {
>         runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
>     }
>     @Test(timeout = 60000)
>     public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
>         runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false,
true, false);
>     }
>     @Test(timeout = 60000)
>     public void testBounded() throws Exception {
>         runTest("testBounded", 30, 5, 5, false, false, false, false);
>     }
>     @Test(timeout = 60000)
>     public void testBoundedWithOptimiseAcknowledge() throws Exception {
>         runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false, true, false);
>     }
>     public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit,
boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean
persistent) throws Exception {
>         BrokerService broker = createBroker(persistent);
>         broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage,
disableFlowControl));
>         broker.start();
>         // Slow consumer
>         Session slowConsumerSession = buildSession("SlowConsumer", URL, optimizeAcknowledge);
>         final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
>         final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
>         final List<Integer> slowConsumerReceived = sendMessageCount <= 1000
? new ArrayList<Integer>() : null;
>         MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
>                 new MessageListener() {
>                     @Override
>                     public void onMessage(Message message) {
>                         try {
>                             slowConsumerReceiveCount.incrementAndGet();
>                             int count = Integer.parseInt(((TextMessage) message).getText());
>                             if (slowConsumerReceived != null) slowConsumerReceived.add(count);
>                             if (count % 10000 == 0) System.out.println("SlowConsumer:
Receive " + count);
>                             blockSlowConsumer.await();
>                         } catch (Exception ignored) {}
>                     }
>                 }
>         );
>         // Fast consumer
>         Session fastConsumerSession = buildSession("FastConsumer", URL, optimizeAcknowledge);
>         final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
>         final List<Integer> fastConsumerReceived = sendMessageCount <= 1000
? new ArrayList<Integer>() : null;
>         MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
>                 new MessageListener() {
>                     @Override
>                     public void onMessage(Message message) {
>                         try {
>                             fastConsumerReceiveCount.incrementAndGet();
>                             int count = Integer.parseInt(((TextMessage) message).getText());
>                             if (fastConsumerReceived != null) fastConsumerReceived.add(count);
>                             if (count % 10000 == 0) System.out.println("FastConsumer:
Receive " + count);
>                         } catch (Exception ignored) {}
>                     }
>                 }
>         );
>         // Wait for consumers to connect
>         Thread.sleep(500);
>         // Publisher
>         AtomicInteger sentCount = new AtomicInteger();
>         List<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>()
: null;
>         Session publisherSession = buildSession("Publisher", URL, optimizeAcknowledge);
>         MessageProducer publisher = createPublisher(publisherSession);
>         for (int i = 0; i < sendMessageCount; i++) {
>             sentCount.incrementAndGet();
>             if (sent != null) sent.add(i);
>             if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
>             publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
>         }
>         // Wait for messages to arrive
>         Thread.sleep(500);
>         System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
>         System.out.println(name + ": Whilst slow consumer blocked:");
>         System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount
+ " " + slowConsumerReceived);
>         System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount
+ " " + fastConsumerReceived);
>         // Unblock slow consumer
>         blockSlowConsumer.countDown();
>         // Wait for messages to arrive
>         Thread.sleep(500);
>         System.out.println(name + ": After slow consumer unblocked:");
>         System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount
+ " " + slowConsumerReceived);
>         System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount
+ " " + fastConsumerReceived);
>         System.out.println();
>         publisher.close();
>         publisherSession.close();
>         slowConsumer.close();
>         slowConsumerSession.close();
>         fastConsumer.close();
>         fastConsumerSession.close();
>         broker.stop();
>         Assert.assertEquals("Fast consumer missed messages whilst slow consumer was blocking",
sent, fastConsumerReceived);
>         Assert.assertEquals("Slow consumer received incorrect message count", Math.min(sendMessageCount,
prefetchLimit + (messageLimit > 0 ? messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
>     }
>     private static BrokerService createBroker(boolean persistent) throws Exception {
>         BrokerService broker = new BrokerService();
>         broker.setBrokerName("TestBroker");
>         broker.setPersistent(persistent);
>         broker.addConnector(URL);
>         return broker;
>     }
>     private static MessageConsumer createSubscriber(Session session, MessageListener
messageListener) throws JMSException {
>         MessageConsumer consumer = session.createConsumer(TOPIC);
>         consumer.setMessageListener(messageListener);
>         return consumer;
>     }
>     private static MessageProducer createPublisher(Session session) throws JMSException
{
>         MessageProducer producer = session.createProducer(TOPIC);
>         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>         return producer;
>     }
>     private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge)
throws JMSException {
>         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
>         connectionFactory.setCopyMessageOnSend(false);
>         connectionFactory.setDisableTimeStampsByDefault(true);
>         connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
>         Connection connection = connectionFactory.createConnection();
>         connection.setClientID(clientId);
>         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         connection.start();
>         return session;
>     }
>     private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int
messageLimit, boolean evictOldestMessage, boolean disableFlowControl) {
>         PolicyMap policyMap = new PolicyMap();
>         PolicyEntry policyEntry = new PolicyEntry();
>         if (evictOldestMessage) {
>             policyEntry.setMessageEvictionStrategy(new OldestMessageEvictionStrategy());
>         }
>         if (disableFlowControl) {
>             policyEntry.setProducerFlowControl(false);
>         }
>         if (prefetchLimit > 0) {
>             policyEntry.setTopicPrefetch(prefetchLimit);
>         }
>         if (messageLimit > 0) {
>             ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
>             messageLimitStrategy.setLimit(messageLimit);
>             policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
>         }
>         policyMap.put(topic, policyEntry);
>         return policyMap;
>     }
> }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message