Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 62308 invoked from network); 23 Dec 2010 15:43:25 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 23 Dec 2010 15:43:25 -0000 Received: (qmail 72471 invoked by uid 500); 23 Dec 2010 15:43:25 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 72271 invoked by uid 500); 23 Dec 2010 15:43:25 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 72255 invoked by uid 99); 23 Dec 2010 15:43:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Dec 2010 15:43:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.22] (HELO thor.apache.org) (140.211.11.22) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Dec 2010 15:43:23 +0000 Received: from thor (localhost [127.0.0.1]) by thor.apache.org (8.13.8+Sun/8.13.8) with ESMTP id oBNFh249013816 for ; Thu, 23 Dec 2010 15:43:03 GMT Message-ID: <12511173.289041293118982741.JavaMail.jira@thor> Date: Thu, 23 Dec 2010 10:43:02 -0500 (EST) From: "Maarten Dirkse (JIRA)" To: dev@activemq.apache.org Subject: [jira] Commented: (AMQ-2683) Producer Flow Control Does Not Seem to Work with Topics MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQ-2683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12974642#action_12974642 ] Maarten Dirkse commented on AMQ-2683: ------------------------------------- Well, it's not a topic version of the PFC test in the AMQ source, but I'm pretty confident it does demonstrate that PFC doesn't work for topics. If you run the code you'll see the producer halt when PFC kicks in, and never start up again, even when the consumer has consumed all outstanding messages. If you change the Destination from a topic to a queue, PFC does work as excepted. {code} import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class BrokerTest implements MessageListener { private static final Log log = LogFactory.getLog(BrokerTest.class); private static final String brokerName = "testBroker"; private static final String brokerUrl = "vm://testBroker"; private static final int destinationMemLimit = 2097152; // 2MB private static final AtomicLong produced = new AtomicLong(); private static final AtomicLong consumed = new AtomicLong(); public static void main(String[] args) throws Exception { // Setup and start the broker BrokerService broker = new BrokerService(); broker.setBrokerName(brokerName); broker.setPersistent(false); broker.setSchedulerSupport(false); broker.setUseJmx(false); broker.setUseShutdownHook(false); broker.addConnector(brokerUrl); // Setup the destination policy PolicyMap pm = new PolicyMap(); // Setup the topic destination policy PolicyEntry tpe = new PolicyEntry(); tpe.setTopic(">"); tpe.setMemoryLimit(destinationMemLimit); tpe.setProducerFlowControl(true); // Setup the topic destination policy PolicyEntry qpe = new PolicyEntry(); qpe.setQueue(">"); qpe.setMemoryLimit(destinationMemLimit); qpe.setProducerFlowControl(true); qpe.setQueuePrefetch(1); pm.setPolicyEntries(Arrays.asList(new PolicyEntry[] { tpe, qpe })); broker.setDestinationPolicy(pm); // Start the broker broker.start(); Destination destination = new ActiveMQTopic("test"); //Destination destination = new ActiveMQQueue("test"); // Create the connection factory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); connectionFactory.setAlwaysSyncSend(true); connectionFactory.setProducerWindowSize(1024); // Start the test destination listener Connection c = connectionFactory.createConnection(); c.start(); c.createSession(false, 1).createConsumer(destination).setMessageListener(new BrokerTest()); // Start producing the test messages Session s = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer p = s.createProducer(destination); for (long i = 0; i < 2000000L; i++) { p.send(s.createTextMessage("test")); long count = produced.incrementAndGet(); if (count % 1000 == 0) { log.debug("Produced " + count / 1000 + "k messages"); } } } @Override public void onMessage(Message arg0) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } long count = consumed.incrementAndGet(); if (count % 1000 == 0) { log.debug("\tConsumed " + count / 1000 + "k messages"); } } } {code} > Producer Flow Control Does Not Seem to Work with Topics > ------------------------------------------------------- > > Key: AMQ-2683 > URL: https://issues.apache.org/jira/browse/AMQ-2683 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.3.0, 5.3.1 > Environment: Windows 2008 Server, Sun Java 6. > Reporter: Brad Willard > Priority: Minor > Fix For: 5.5.0 > > > I have multiple producers posting messages to queues and adding statistical information to a topic. I have one consumer of that topic subscribing and taking those messages and monitors for problems. The consumer is slower than publishing to the topic. I have flow control enabled and I expected that once the memory limit of the Consumers dispatch queue was reached it would throttle the publishers to the topic, but instead it seems to lock everything up. > The message producers are putting messages to each queue and the topic on separate sessions. The broker basically stops once it posts the message that it's going to throttle producers. The topic messages are messages with text attributes, they are non persistent and posted non-transactional using the AUTO_ACKNOWLEDGE mode. > All the producers and consumers use the same Connection factory with with these attributes set. > ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); > factory.setProducerWindowSize(1024 * 1024); > factory.getPrefetchPolicy().setTopicPrefetch(10); > My activemq config is as follows: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.