Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5E6EF7B68 for ; Thu, 20 Oct 2011 17:35:27 +0000 (UTC) Received: (qmail 52400 invoked by uid 500); 20 Oct 2011 17:35:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 52371 invoked by uid 500); 20 Oct 2011 17:35:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 52364 invoked by uid 99); 20 Oct 2011 17:35:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2011 17:35:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2011 17:35:25 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 11D0A238897D for ; Thu, 20 Oct 2011 17:35:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1186952 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/usecases/ Date: Thu, 20 Oct 2011 17:35:04 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111020173505.11D0A238897D@eris.apache.org> Author: gtully Date: Thu Oct 20 17:35:04 2011 New Revision: 1186952 URL: http://svn.apache.org/viewvc?rev=1186952&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3551 - exclude networkConnectors from sendFailIfNoSpace on producer flow control, with test for topic and queue networks Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1186952&r1=1186951&r2=1186952&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Oct 20 17:35:04 2011 @@ -601,11 +601,11 @@ public abstract class BaseDestination im } protected final void waitForSpace(ConnectionContext context, Usage usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning); throw new ResourceAllocationException(warning); } - if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning); throw new ResourceAllocationException(warning); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1186952&r1=1186951&r2=1186952&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Oct 20 17:35:04 2011 @@ -555,7 +555,7 @@ public class Queue extends BaseDestinati + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." @@ -613,7 +613,7 @@ public class Queue extends BaseDestinati } }); - if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage .getSendFailIfNoSpaceAfterTimeout())); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1186952&r1=1186951&r2=1186952&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Oct 20 17:35:04 2011 @@ -298,7 +298,7 @@ public class Topic extends BaseDestinati + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." @@ -427,7 +427,7 @@ public class Topic extends BaseDestinati + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException(logMessage); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java?rev=1186952&r1=1186951&r2=1186952&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java Thu Oct 20 17:35:04 2011 @@ -20,6 +20,7 @@ package org.apache.activemq.usecases; import java.net.URI; import java.util.Vector; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.jms.MessageConsumer; import junit.framework.Test; @@ -27,7 +28,9 @@ import org.apache.activemq.JmsMultipleBr import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.MessageIdList; import org.apache.commons.logging.Log; @@ -99,15 +102,6 @@ public class NetworkBridgeProducerFlowCo private static final Log LOG = LogFactory .getLog(NetworkBridgeProducerFlowControlTest.class); - // Consumer prefetch is disabled for broker1's consumers. - private static final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue( - NetworkBridgeProducerFlowControlTest.class.getSimpleName() - + ".slow.shared?consumer.prefetchSize=1"); - - private static final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue( - NetworkBridgeProducerFlowControlTest.class.getSimpleName() - + ".fast.shared?consumer.prefetchSize=1"); - // Combo flag set to true/false by the test framework. public boolean persistentTestMessages; public boolean networkIsAlwaysSendSync; @@ -146,6 +140,15 @@ public class NetworkBridgeProducerFlowCo final long TEST_MESSAGE_SIZE = 1024; final long SLOW_CONSUMER_DELAY_MILLIS = 100; + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + // Start a local and a remote broker. createBroker(new URI("broker:(tcp://localhost:0" + ")?brokerName=broker0&persistent=false&useJmx=true")); @@ -246,4 +249,139 @@ public class NetworkBridgeProducerFlowCo fastConsumerTime.get() < slowConsumerTime.get() / 10); } } + + public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception { + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + + doTestSendFailIfNoSpaceDoesNotBlockNetwork( + SLOW_SHARED_QUEUE, + FAST_SHARED_QUEUE); + } + + public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception { + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQTopic SLOW_SHARED_TOPIC = new ActiveMQTopic( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQTopic FAST_SHARED_TOPIC = new ActiveMQTopic( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + + doTestSendFailIfNoSpaceDoesNotBlockNetwork( + SLOW_SHARED_TOPIC, + FAST_SHARED_TOPIC); + } + + public void doTestSendFailIfNoSpaceDoesNotBlockNetwork( + ActiveMQDestination slowDestination, ActiveMQDestination fastDestination) throws Exception { + + final int NUM_MESSAGES = 100; + final long TEST_MESSAGE_SIZE = 1024; + final long SLOW_CONSUMER_DELAY_MILLIS = 100; + + // Start a local and a remote broker. + createBroker(new URI("broker:(tcp://localhost:0" + + ")?brokerName=broker0&persistent=false&useJmx=true")); + BrokerService remoteBroker = createBroker(new URI( + "broker:(tcp://localhost:0" + + ")?brokerName=broker1&persistent=false&useJmx=true")); + remoteBroker.getSystemUsage().setSendFailIfNoSpace(true); + + // Set a policy on the remote broker that limits the maximum size of the + // slow shared queue. + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE); + PolicyMap policyMap = new PolicyMap(); + policyMap.put(slowDestination, policyEntry); + remoteBroker.setDestinationPolicy(policyMap); + + // Create an outbound bridge from the local broker to the remote broker. + // The bridge is configured with the remoteDispatchType enhancement. + NetworkConnector nc = bridgeBrokers("broker0", "broker1"); + nc.setAlwaysSyncSend(true); + nc.setPrefetchSize(1); + + startAllBrokers(); + waitForBridgeFormation(); + + // Start two asynchronous consumers on the remote broker, one for each + // of the two shared queues, and keep track of how long it takes for + // each of the consumers to receive all the messages. + final CountDownLatch fastConsumerLatch = new CountDownLatch( + NUM_MESSAGES); + final CountDownLatch slowConsumerLatch = new CountDownLatch( + NUM_MESSAGES); + + final long startTimeMillis = System.currentTimeMillis(); + final AtomicLong fastConsumerTime = new AtomicLong(); + final AtomicLong slowConsumerTime = new AtomicLong(); + + Thread fastWaitThread = new Thread() { + @Override + public void run() { + try { + fastConsumerLatch.await(); + fastConsumerTime.set(System.currentTimeMillis() + - startTimeMillis); + } catch (InterruptedException ex) { + exceptions.add(ex); + Assert.fail(ex.getMessage()); + } + } + }; + + Thread slowWaitThread = new Thread() { + @Override + public void run() { + try { + slowConsumerLatch.await(); + slowConsumerTime.set(System.currentTimeMillis() + - startTimeMillis); + } catch (InterruptedException ex) { + exceptions.add(ex); + Assert.fail(ex.getMessage()); + } + } + }; + + fastWaitThread.start(); + slowWaitThread.start(); + + createConsumer("broker1", fastDestination, fastConsumerLatch); + MessageConsumer slowConsumer = createConsumer("broker1", + slowDestination, slowConsumerLatch); + MessageIdList messageIdList = brokers.get("broker1").consumers + .get(slowConsumer); + messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS); + + // Send the test messages to the local broker's shared queues. The + // messages are either persistent or non-persistent to demonstrate the + // difference between synchronous and asynchronous dispatch. + persistentDelivery = false; + sendMessages("broker0", fastDestination, NUM_MESSAGES); + sendMessages("broker0", slowDestination, NUM_MESSAGES); + + fastWaitThread.join(TimeUnit.SECONDS.toMillis(60)); + slowWaitThread.join(TimeUnit.SECONDS.toMillis(60)); + + assertTrue("no exceptions on the wait threads:" + exceptions, + exceptions.isEmpty()); + + LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get()); + LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get()); + + assertTrue("fast time set", fastConsumerTime.get() > 0); + assertTrue("slow time set", slowConsumerTime.get() > 0); + + // Verify the behaviour as described in the description of this class. + Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10); + } } \ No newline at end of file