Author: gtully Date: Wed May 19 11:56:52 2010 New Revision: 946138 URL: http://svn.apache.org/viewvc?rev=946138&view=rev Log: https://issues.apache.org/activemq/browse/AMQ-378 - add AbortSlowConsumerStrategy destination policy that will abort consumers that are repeatildy slow or slow for a defined period. Slowness is a product of the prefetch and message production rate. Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed May 19 11:56:52 2010 @@ -48,6 +48,7 @@ public abstract class AbstractSubscripti private BooleanExpression selectorExpression; private ObjectName objectName; private int cursorMemoryHighWaterMark = 70; + private boolean slowConsumer; public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { @@ -162,6 +163,14 @@ public abstract class AbstractSubscripti public boolean isRecoveryRequired() { return true; } + + public boolean isSlowConsumer() { + return slowConsumer; + } + + public void setSlowConsumer(boolean val) { + slowConsumer = val; + } public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { boolean result = false; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed May 19 11:56:52 2010 @@ -26,6 +26,7 @@ import org.apache.activemq.broker.Broker import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; @@ -81,6 +82,7 @@ public abstract class BaseDestination im private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; protected int cursorMemoryHighWaterMark = 70; protected int storeUsageHighWaterMark = 100; + private SlowConsumerStrategy slowConsumerStrategy; /** * @param broker @@ -449,6 +451,9 @@ public abstract class BaseDestination im if (advisoryForSlowConsumers) { broker.slowConsumer(context, this, subs); } + if (slowConsumerStrategy != null) { + slowConsumerStrategy.slowConsumer(context, subs); + } } /** @@ -573,5 +578,9 @@ public abstract class BaseDestination im } protected abstract Log getLog(); + + public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { + this.slowConsumerStrategy = slowConsumerStrategy; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed May 19 11:56:52 2010 @@ -262,6 +262,7 @@ public class DurableTopicSubscription ex } dispatched.clear(); } + setSlowConsumer(false); } /** Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed May 19 11:56:52 2010 @@ -70,8 +70,6 @@ public abstract class PrefetchSubscripti private final Object pendingLock = new Object(); private final Object dispatchLock = new Object(); protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); - private boolean slowConsumer; - private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { @@ -565,7 +563,7 @@ public abstract class PrefetchSubscripti try { int numberToDispatch = countBeforeFull(); if (numberToDispatch > 0) { - slowConsumer=false; + setSlowConsumer(false); pending.setMaxBatchSize(numberToDispatch); int count = 0; pending.reset(); @@ -598,15 +596,10 @@ public abstract class PrefetchSubscripti } } } - }else { - if (!slowConsumer) { - slowConsumer=true; - ConnectionContext c = new ConnectionContext(); - c.setBroker(context.getBroker()); - for (Destination dest :destinations) { - dest.slowConsumer(c,this); - } - + } else if (!isSlowConsumer()) { + setSlowConsumer(true); + for (Destination dest :destinations) { + dest.slowConsumer(context, this); } } } finally { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed May 19 11:56:52 2010 @@ -103,6 +103,7 @@ public class QueueSubscription extends P /** */ public void destroy() { + setSlowConsumer(false); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Wed May 19 11:56:52 2010 @@ -227,4 +227,6 @@ public interface Subscription extends Su public int getCursorMemoryHighWaterMark(); public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); + + boolean isSlowConsumer(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed May 19 11:56:52 2010 @@ -62,8 +62,6 @@ public class TopicSubscription extends A private final AtomicLong enqueueCounter = new AtomicLong(0); private final AtomicLong dequeueCounter = new AtomicLong(0); private int memoryUsageHighWaterMark = 95; - private boolean slowConsumer; - // allow duplicate suppression in a ring network of brokers protected int maxProducersToAudit = 1024; protected int maxAuditDepth = 1000; @@ -99,11 +97,11 @@ public class TopicSubscription extends A // if maximumPendingMessages is set we will only discard messages which // have not been dispatched (i.e. we allow the prefetch buffer to be filled) dispatch(node); - slowConsumer=false; + setSlowConsumer(false); } else { //we are slow - if(!slowConsumer) { - slowConsumer=true; + if(!isSlowConsumer()) { + setSlowConsumer(true); for (Destination dest: destinations) { dest.slowConsumer(getContext(), this); } @@ -540,6 +538,7 @@ public class TopicSubscription extends A LOG.warn("Failed to destroy cursor", e); } } + setSlowConsumer(false); } public int getPrefetchSize() { Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=946138&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed May 19 11:56:52 2010 @@ -0,0 +1,179 @@ +package org.apache.activemq.broker.region.policy; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.transport.InactivityIOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds + * + * @org.apache.xbean.XBean + */ +public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable { + + private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class); + + private static final Scheduler scheduler = Scheduler.getInstance(); + private AtomicBoolean taskStarted = new AtomicBoolean(false); + private Map slowConsumers = new ConcurrentHashMap(); + + private long maxSlowCount = -1; + private long maxSlowDuration = 30*1000; + private long checkPeriod = 30*1000; + private boolean abortConnection = false; + + public void slowConsumer(ConnectionContext context, Subscription subs) { + if (maxSlowCount < 0 && maxSlowDuration < 0) { + // nothing to do + LOG.info("no limits set, slowConsumer strategy has nothing to do"); + return; + } + + if (taskStarted.compareAndSet(false, true)) { + scheduler.executePeriodically(this, checkPeriod); + } + + if (!slowConsumers.containsKey(subs)) { + slowConsumers.put(subs, new SlowConsumerEntry(context)); + } else if (maxSlowCount > 0) { + slowConsumers.get(subs).slow(); + } + } + + public void run() { + if (maxSlowDuration > 0) { + // mark + for (SlowConsumerEntry entry : slowConsumers.values()) { + entry.mark(); + } + } + + HashMap toAbort = new HashMap(); + for (Entry entry : slowConsumers.entrySet()) { + if (entry.getKey().isSlowConsumer()) { + if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration) + || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { + toAbort.put(entry.getKey(), entry.getValue()); + slowConsumers.remove(entry.getKey()); + } + } else { + LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow"); + slowConsumers.remove(entry.getKey()); + } + } + + for (final Entry entry : toAbort.entrySet()) { + ConnectionContext connectionContext = entry.getValue().context; + if (connectionContext!= null) { + try { + LOG.info("aborting " + + (abortConnection ? "connection" : "consumer") + + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId()); + + final Connection connection = connectionContext.getConnection(); + if (connection != null) { + if (abortConnection) { + scheduler.executeAfterDelay(new Runnable() { + public void run() { + connection.serviceException(new InactivityIOException("Consumer was slow too often (>" + + maxSlowCount + ") or too long (>" + + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId())); + }}, 0l); + } else { + // just abort the consumer by telling it to stop + ConsumerControl stopConsumer = new ConsumerControl(); + stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId()); + stopConsumer.setClose(true); + connection.dispatchAsync(stopConsumer); + } + } else { + LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); + } + } catch (Exception e) { + LOG.info("exception on stopping " + + (abortConnection ? "connection" : "consumer") + + " to abort slow consumer: " + entry.getKey(), e); + } + } + } + } + + public long getMaxSlowCount() { + return maxSlowCount; + } + + /** + * number of times a subscription can be deemed slow before triggering abort + * effect depends on dispatch rate as slow determination is done on dispatch + */ + public void setMaxSlowCount(int maxSlowCount) { + this.maxSlowCount = maxSlowCount; + } + + public long getMaxSlowDuration() { + return maxSlowDuration; + } + + /** + * time in milliseconds that a sub can remain slow before triggering + * an abort. + * @param maxSlowDuration + */ + public void setMaxSlowDuration(long maxSlowDuration) { + this.maxSlowDuration = maxSlowDuration; + } + + public long getCheckPeriod() { + return checkPeriod; + } + + /** + * time in milliseconds between checks for slow subscriptions + * @param checkPeriod + */ + public void setCheckPeriod(long checkPeriod) { + this.checkPeriod = checkPeriod; + } + + public boolean isAbortConnection() { + return abortConnection; + } + + /** + * abort the consumers connection rather than sending a stop command to the remote consumer + * @param abortConnection + */ + public void setAbortConnection(boolean abortConnection) { + this.abortConnection = abortConnection; + } + + static class SlowConsumerEntry { + + final ConnectionContext context; + int slowCount = 1; + int markCount = 0; + + SlowConsumerEntry(ConnectionContext context) { + this.context = context; + } + + public void slow() { + slowCount++; + } + + public void mark() { + markCount++; + } + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed May 19 11:56:52 2010 @@ -86,6 +86,7 @@ public class PolicyEntry extends Destina private boolean usePrefetchExtension = true; private int cursorMemoryHighWaterMark = 70; private int storeUsageHighWaterMark = 100; + private SlowConsumerStrategy slowConsumerStrategy; public void configure(Broker broker,Queue queue) { @@ -147,6 +148,7 @@ public class PolicyEntry extends Destina destination.setMaxExpirePageSize(getMaxExpirePageSize()); destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); + destination.setSlowConsumerStrategy(getSlowConsumerStrategy()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -724,4 +726,12 @@ public class PolicyEntry extends Destina return storeUsageHighWaterMark; } + public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { + this.slowConsumerStrategy = slowConsumerStrategy; + } + + public SlowConsumerStrategy getSlowConsumerStrategy() { + return this.slowConsumerStrategy; + } + } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=946138&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed May 19 11:56:52 2010 @@ -0,0 +1,13 @@ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; + +/* + * a strategy for dealing with slow consumers + */ +public interface SlowConsumerStrategy { + + void slowConsumer(ConnectionContext context, Subscription subs); + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java Wed May 19 11:56:52 2010 @@ -61,7 +61,7 @@ public class JmsMultipleClientsTestSuppo protected boolean useConcurrentSend = true; protected boolean durable; - protected boolean topic; + public boolean topic; protected boolean persistent; protected BrokerService broker; @@ -115,6 +115,7 @@ public class JmsMultipleClientsTestSuppo } protected void sendMessages(Connection connection, Destination destination, int count) throws Exception { + connections.add(connection); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -195,6 +196,9 @@ public class JmsMultipleClientsTestSuppo protected ActiveMQDestination createDestination() throws JMSException { String name = "." + getClass().getName() + "." + getName(); + // ensure not inadvertently composite because of combos + name = name.replace(' ','_'); + name = name.replace(',','&'); if (topic) { destination = new ActiveMQTopic("Topic" + name); return (ActiveMQDestination)destination; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java Wed May 19 11:56:52 2010 @@ -52,7 +52,6 @@ import org.apache.activemq.broker.region import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.ThreadTracker; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -82,7 +81,6 @@ public class MessageEvictionTest { @After public void tearDown() throws Exception { - ThreadTracker.result(); connection.stop(); broker.stop(); } @@ -155,6 +153,7 @@ public class MessageEvictionTest { ExecutorService executor = Executors.newCachedThreadPool(); final CountDownLatch doAck = new CountDownLatch(1); + final CountDownLatch ackDone = new CountDownLatch(1); final CountDownLatch consumerRegistered = new CountDownLatch(1); executor.execute(new Runnable() { public void run() { @@ -167,15 +166,18 @@ public class MessageEvictionTest { doAck.await(60, TimeUnit.SECONDS); LOG.info("acking: " + message.getJMSMessageID()); message.acknowledge(); + ackDone.countDown(); } catch (Exception e) { - e.printStackTrace(); - consumerRegistered.countDown(); + e.printStackTrace(); fail(e.toString()); + } finally { + consumerRegistered.countDown(); + ackDone.countDown(); } } }); consumerRegistered.countDown(); - doAck.await(60, TimeUnit.SECONDS); + ackDone.await(60, TimeUnit.SECONDS); consumer.close(); } catch (Exception e) { e.printStackTrace(); @@ -256,7 +258,8 @@ public class MessageEvictionTest { // to keep the limit in check and up to date rather than just the first few, evict some OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); - messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(100); + // whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test + //messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000); entry.setMessageEvictionStrategy(messageEvictionStrategy); // let evicted messaged disappear Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=946138&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed May 19 11:56:52 2010 @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.JmsMultipleClientsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.util.MessageIdList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener { + + private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class); + + AbortSlowConsumerStrategy underTest; + + public boolean abortConnection = false; + public long checkPeriod = 2*1000; + public long maxSlowDuration = 5*1000; + + private List exceptions = new ArrayList(); + + @Override + protected void setUp() throws Exception { + exceptions.clear(); + topic = true; + underTest = new AbortSlowConsumerStrategy(); + super.setUp(); + createDestination(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PolicyEntry policy = new PolicyEntry(); + underTest.setAbortConnection(abortConnection); + underTest.setCheckPeriod(checkPeriod); + underTest.setMaxSlowDuration(maxSlowDuration); + + policy.setSlowConsumerStrategy(underTest); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + public void testRegularConsumerIsNotAborted() throws Exception { + startConsumers(destination); + for (Connection c: connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + allMessagesList.waitForMessagesToArrive(10); + allMessagesList.assertAtLeastMessagesReceived(10); + } + + public void initCombosForTestLittleSlowConsumerIsNotAborted() { + addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testLittleSlowConsumerIsNotAborted() throws Exception { + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(500); + for (Connection c: connections) { + c.setExceptionListener(this); + } + startProducers(destination, 12); + allMessagesList.waitForMessagesToArrive(10); + allMessagesList.assertAtLeastMessagesReceived(10); + } + + + public void initCombosForTestSlowConsumerIsAborted() { + addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testSlowConsumerIsAborted() throws Exception { + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(8*1000); + for (Connection c: connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + + consumertoAbort.getValue().assertMessagesReceived(1); + + TimeUnit.SECONDS.sleep(5); + + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + } + + + public void testOnlyOneSlowConsumerIsAborted() throws Exception { + consumerCount = 10; + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(8*1000); + for (Connection c: connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + + allMessagesList.waitForMessagesToArrive(99); + allMessagesList.assertAtLeastMessagesReceived(99); + + consumertoAbort.getValue().assertMessagesReceived(1); + + TimeUnit.SECONDS.sleep(5); + + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + } + + public void testAbortAlreadyClosingConsumers() throws Exception { + consumerCount = 1; + startConsumers(destination); + for (MessageIdList list : consumers.values()) { + list.setProcessingDelay(6*1000); + } + for (Connection c: connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + allMessagesList.waitForMessagesToArrive(consumerCount); + + for (MessageConsumer consumer : consumers.keySet()) { + LOG.info("closing consumer: " + consumer); + /// will block waiting for on message till 6secs expire + consumer.close(); + } + } + + public void initCombosForTestAbortAlreadyClosedConsumers() { + addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testAbortAlreadyClosedConsumers() throws Exception { + Connection conn = createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + conn.start(); + startProducers(destination, 20); + TimeUnit.SECONDS.sleep(1); + LOG.info("closing consumer: " + consumer); + consumer.close(); + + TimeUnit.SECONDS.sleep(5); + assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + } + + + public void initCombosForTestAbortAlreadyClosedConnection() { + addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testAbortAlreadyClosedConnection() throws Exception { + Connection conn = createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + sess.createConsumer(destination); + conn.start(); + startProducers(destination, 20); + TimeUnit.SECONDS.sleep(1); + LOG.info("closing connection: " + conn); + conn.close(); + + TimeUnit.SECONDS.sleep(5); + assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + } + + public void testAbortConsumerOnDeadConnection() throws Exception { + // socket proxy on pause, close could hang?? + } + + public void onException(JMSException exception) { + exceptions.add(exception); + exception.printStackTrace(); + } + + public static Test suite() { + return suite(AbortSlowConsumerTest.class); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=946138&r1=946137&r2=946138&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed May 19 11:56:52 2010 @@ -291,8 +291,12 @@ public class QueueDuplicatesFromStoreTes } public void setCursorMemoryHighWaterMark( - int cursorMemoryHighWaterMark) { + int cursorMemoryHighWaterMark) { } + + public boolean isSlowConsumer() { + return false; + } }; queue.addSubscription(contextNotInTx, subscription);