activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maarten Dirkse (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (AMQ-2683) Producer Flow Control Does Not Seem to Work with Topics
Date Thu, 23 Dec 2010 15:49:01 GMT

    [ https://issues.apache.org/jira/browse/AMQ-2683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12974642#action_12974642
] 

Maarten Dirkse edited comment on AMQ-2683 at 12/23/10 10:48 AM:
----------------------------------------------------------------

Well, it's not a topic version of the PFC test in the AMQ source, but I'm pretty confident
it does demonstrate that PFC doesn't work for topics.
If you run the code you'll see the producer halt when PFC kicks in, and never start up again,
even when the consumer has consumed all outstanding messages. If you change the Destination
from a topic to a queue, PFC does work as expected. 

{code}
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class BrokerTest implements MessageListener {
  private static final Log log = LogFactory.getLog(BrokerTest.class);
  private static final String brokerName = "testBroker";
  private static final String brokerUrl = "vm://" + brokerName;
  private static final int destinationMemLimit = 2097152; // 2MB
  private static final AtomicLong produced = new AtomicLong();
  private static final AtomicLong consumed = new AtomicLong();

  public static void main(String[] args) throws Exception {
    // Setup and start the broker
    BrokerService broker = new BrokerService();
    broker.setBrokerName(brokerName);
    broker.setPersistent(false);
    broker.setSchedulerSupport(false);
    broker.setUseJmx(false);
    broker.setUseShutdownHook(false);
    broker.addConnector(brokerUrl);

    // Setup the destination policy
    PolicyMap pm = new PolicyMap();

    // Setup the topic destination policy
    PolicyEntry tpe = new PolicyEntry();
    tpe.setTopic(">");
    tpe.setMemoryLimit(destinationMemLimit);
    tpe.setProducerFlowControl(true);

    // Setup the topic destination policy
    PolicyEntry qpe = new PolicyEntry();
    qpe.setQueue(">");
    qpe.setMemoryLimit(destinationMemLimit);
    qpe.setProducerFlowControl(true);
    qpe.setQueuePrefetch(1);

    pm.setPolicyEntries(Arrays.asList(new PolicyEntry[] { tpe, qpe }));

    broker.setDestinationPolicy(pm);

    // Start the broker
    broker.start();

    Destination destination = new ActiveMQTopic("test");
    //Destination destination = new ActiveMQQueue("test");

    // Create the connection factory
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
    connectionFactory.setAlwaysSyncSend(true);
    connectionFactory.setProducerWindowSize(1024);

    // Start the test destination listener
    Connection c = connectionFactory.createConnection();
    c.start();
    c.createSession(false, 1).createConsumer(destination).setMessageListener(new BrokerTest());

    // Start producing the test messages
    Session s = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer p = s.createProducer(destination);

    for (long i = 0; i < 2000000L; i++) {
      p.send(s.createTextMessage("test"));

      long count = produced.incrementAndGet();
      if (count % 1000 == 0) {
        log.debug("Produced " + count / 1000 + "k messages");
      }
    }
  }

  @Override
  public void onMessage(Message arg0) {
    try {
      Thread.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    long count = consumed.incrementAndGet();
    if (count % 1000 == 0) {
      log.debug("\tConsumed " + count / 1000 + "k messages");
    }
  }
}
{code}

      was (Author: mdirkse):
    Well, it's not a topic version of the PFC test in the AMQ source, but I'm pretty confident
it does demonstrate that PFC doesn't work for topics.
If you run the code you'll see the producer halt when PFC kicks in, and never start up again,
even when the consumer has consumed all outstanding messages. If you change the Destination
from a topic to a queue, PFC does work as expected. 

{code}
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class BrokerTest implements MessageListener {
  private static final Log log = LogFactory.getLog(BrokerTest.class);
  private static final String brokerName = "testBroker";
  private static final String brokerUrl = "vm://testBroker";
  private static final int destinationMemLimit = 2097152; // 2MB
  private static final AtomicLong produced = new AtomicLong();
  private static final AtomicLong consumed = new AtomicLong();

  public static void main(String[] args) throws Exception {
    // Setup and start the broker
    BrokerService broker = new BrokerService();
    broker.setBrokerName(brokerName);
    broker.setPersistent(false);
    broker.setSchedulerSupport(false);
    broker.setUseJmx(false);
    broker.setUseShutdownHook(false);
    broker.addConnector(brokerUrl);

    // Setup the destination policy
    PolicyMap pm = new PolicyMap();

    // Setup the topic destination policy
    PolicyEntry tpe = new PolicyEntry();
    tpe.setTopic(">");
    tpe.setMemoryLimit(destinationMemLimit);
    tpe.setProducerFlowControl(true);

    // Setup the topic destination policy
    PolicyEntry qpe = new PolicyEntry();
    qpe.setQueue(">");
    qpe.setMemoryLimit(destinationMemLimit);
    qpe.setProducerFlowControl(true);
    qpe.setQueuePrefetch(1);

    pm.setPolicyEntries(Arrays.asList(new PolicyEntry[] { tpe, qpe }));

    broker.setDestinationPolicy(pm);

    // Start the broker
    broker.start();

    Destination destination = new ActiveMQTopic("test");
    //Destination destination = new ActiveMQQueue("test");

    // Create the connection factory
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
    connectionFactory.setAlwaysSyncSend(true);
    connectionFactory.setProducerWindowSize(1024);

    // Start the test destination listener
    Connection c = connectionFactory.createConnection();
    c.start();
    c.createSession(false, 1).createConsumer(destination).setMessageListener(new BrokerTest());

    // Start producing the test messages
    Session s = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer p = s.createProducer(destination);

    for (long i = 0; i < 2000000L; i++) {
      p.send(s.createTextMessage("test"));

      long count = produced.incrementAndGet();
      if (count % 1000 == 0) {
        log.debug("Produced " + count / 1000 + "k messages");
      }
    }
  }

  @Override
  public void onMessage(Message arg0) {
    try {
      Thread.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    long count = consumed.incrementAndGet();
    if (count % 1000 == 0) {
      log.debug("\tConsumed " + count / 1000 + "k messages");
    }
  }
}
{code}
  
> Producer Flow Control Does Not Seem to Work with Topics
> -------------------------------------------------------
>
>                 Key: AMQ-2683
>                 URL: https://issues.apache.org/jira/browse/AMQ-2683
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.0, 5.3.1
>         Environment: Windows 2008 Server, Sun Java 6. 
>            Reporter: Brad Willard
>            Priority: Minor
>             Fix For: 5.5.0
>
>
> I have multiple producers posting messages to queues and adding statistical information
to a topic.  I have one consumer of that topic subscribing and taking those messages and monitors
for problems.  The consumer is slower than publishing to the topic.  I have flow control enabled
and I expected that once the memory limit of the Consumers dispatch queue was reached it would
throttle the publishers to the topic, but instead it seems to lock everything up.
> The message producers are putting messages to each queue and the topic on separate sessions.
 The broker basically stops once it posts the message that it's going to throttle producers.
 The topic messages are messages with text attributes, they are non persistent and posted
non-transactional using the AUTO_ACKNOWLEDGE mode.
> All the producers and consumers use the same Connection factory with with these attributes
set.
> ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
> factory.setProducerWindowSize(1024 * 1024);
> factory.getPrefetchPolicy().setTopicPrefetch(10);
> My activemq config is as follows:
>     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"
dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
>  
>         <!--
> 			For better performances use VM cursor and small memory limit.
> 			For more information, see:
>             
>             http://activemq.apache.org/message-cursors.html
>             
>             Also, if your producer is "hanging", it's probably due to producer flow control.
>             For more information, see:
>             http://activemq.apache.org/producer-flow-control.html
>         -->
>               
>         <destinationPolicy>
>             <policyMap>
>               <policyEntries>
>                 <policyEntry topic=">" producerFlowControl="true" memoryLimit="15mb">
>                   <pendingSubscriberPolicy>
>                     <vmCursor />
>                   </pendingSubscriberPolicy>
>                 </policyEntry>
>                 <policyEntry queue=">" producerFlowControl="true" memoryLimit="15mb">
>                   <!-- Use VM cursor for better latency
>                        For more information, see:
>                        
>                        http://activemq.apache.org/message-cursors.html
>                        
>                   <pendingQueuePolicy>
>                     <vmQueueCursor/>
>                   </pendingQueuePolicy>
>                   -->
>                 </policyEntry>
>               </policyEntries>
>             </policyMap>
>         </destinationPolicy> 
>  
>         
>         <!-- 
>             The managementContext is used to configure how ActiveMQ is exposed in 
>             JMX. By default, ActiveMQ uses the MBean server that is started by 
>             the JVM. For more information, see: 
>             
>             http://activemq.apache.org/jmx.html 
>         -->
>         <managementContext>
>             <managementContext createConnector="false"/>
>         </managementContext>
>         <!-- 
>             Configure message persistence for the broker. The default persistence
>             mechanism is the KahaDB store (identified by the kahaDB tag). 
>             For more information, see: 
>             
>             http://activemq.apache.org/persistence.html 
>         -->
>         <persistenceAdapter>
>             <kahaDB directory="${activemq.base}/data/kahadb" journalMaxFileLength="5
mb"/>
>         </persistenceAdapter>
>         
>         
>         <!--
>             The systemUsage controls the maximum amount of space the broker will 
>             use before slowing down producers. For more information, see:
>             
>             http://activemq.apache.org/producer-flow-control.html
>         -->     
>         <systemUsage>
>             <systemUsage>
>                 <memoryUsage>
>                     <memoryUsage limit="200 mb"/>
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="50 gb"/>
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="100 mb"/>
>                 </tempUsage>
>             </systemUsage>
>         </systemUsage>
> 	
> 		  
>         <!-- 
>             The transport connectors expose ActiveMQ over a given protocol to
>             clients and other brokers. For more information, see: 
>             
>             http://activemq.apache.org/configuring-transports.html 
>         -->
>         <transportConnectors>
>             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>         </transportConnectors>
>     </broker>

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message