Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 83255 invoked from network); 21 Jul 2010 17:11:05 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 21 Jul 2010 17:11:05 -0000 Received: (qmail 39732 invoked by uid 500); 21 Jul 2010 17:11:05 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 39689 invoked by uid 500); 21 Jul 2010 17:11:05 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 39682 invoked by uid 99); 21 Jul 2010 17:11:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jul 2010 17:11:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jul 2010 17:10:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1EAC02388C7D; Wed, 21 Jul 2010 17:09:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r966319 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/a... Date: Wed, 21 Jul 2010 17:09:33 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100721170934.1EAC02388C7D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Wed Jul 21 17:09:33 2010 New Revision: 966319 URL: http://svn.apache.org/viewvc?rev=966319&view=rev Log: resolve: https://issues.apache.org/activemq/browse/AMQ-2741 - visibility of abort slow consumer policy in via jmx Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jul 21 17:09:33 2010 @@ -657,7 +657,9 @@ public class ActiveMQMessageConsumer imp void doClose() throws JMSException { dispose(); RemoveInfo removeCommand = info.createRemoveCommand(); - LOG.info("remove: " + this.getConsumerId() + ", lasteDeliveredSequenceId:" + lastDeliveredSequenceId); + if (LOG.isDebugEnabled()) { + LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId); + } removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); this.session.asyncSendPacket(removeCommand); } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java?rev=966319&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java Wed Jul 21 17:09:33 2010 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.SlowConsumerEntry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.management.ObjectName; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.util.Map; + +public class AbortSlowConsumerStrategyView implements AbortSlowConsumerStrategyViewMBean { + private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategyView.class); + private ManagedRegionBroker broker; + private AbortSlowConsumerStrategy strategy; + + + public AbortSlowConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowConsumerStrategy slowConsumerStrategy) { + this.broker = managedRegionBroker; + this.strategy = slowConsumerStrategy; + } + + public long getMaxSlowCount() { + return strategy.getMaxSlowCount(); + } + + public void setMaxSlowCount(long maxSlowCount) { + strategy.setMaxSlowCount(maxSlowCount); + } + + public long getMaxSlowDuration() { + return strategy.getMaxSlowDuration(); + } + + public void setMaxSlowDuration(long maxSlowDuration) { + strategy.setMaxSlowDuration(maxSlowDuration); + } + + public long getCheckPeriod() { + return strategy.getCheckPeriod(); + } + + public TabularData getSlowConsumers() throws OpenDataException { + + OpenTypeSupport.OpenTypeFactory factory = OpenTypeSupport.getFactory(SlowConsumerEntry.class); + CompositeType ct = factory.getCompositeType(); + TabularType tt = new TabularType("SlowConsumers", "Table of current slow Consumers", ct, new String[] {"subscription" }); + TabularDataSupport rc = new TabularDataSupport(tt); + + int index = 0; + Map slowConsumers = strategy.getSlowConsumers(); + for (Map.Entry entry : slowConsumers.entrySet()) { + entry.getValue().setSubscription(broker.getSubscriberObjectName(entry.getKey())); + rc.put(OpenTypeSupport.convert(entry.getValue())); + } + return rc; + } + + public void abortConsumer(ObjectName consumerToAbort) { + Subscription sub = broker.getSubscriber(consumerToAbort); + if (sub != null) { + LOG.info("aborting consumer via jmx: " + sub.getConsumerInfo().getConsumerId()); + strategy.abortConsumer(sub, false); + } else { + LOG.warn("cannot resolve subscription matching name: " + consumerToAbort); + } + + } + + public void abortConnection(ObjectName consumerToAbort) { + Subscription sub = broker.getSubscriber(consumerToAbort); + if (sub != null) { + LOG.info("aborting consumer connection via jmx: " + sub.getConsumerInfo().getConsumerId().getConnectionId()); + strategy.abortConsumer(sub, true); + } else { + LOG.warn("cannot resolve subscription matching name: " + consumerToAbort); + } + } + + public void abortConsumer(String objectNameOfConsumerToAbort) { + abortConsumer(toObjectName(objectNameOfConsumerToAbort)); + } + + public void abortConnection(String objectNameOfConsumerToAbort) { + abortConnection(toObjectName(objectNameOfConsumerToAbort)); + } + + private ObjectName toObjectName(String objectName) { + ObjectName result = null; + try { + result = new ObjectName(objectName); + } catch (Exception e) { + LOG.warn("cannot create subscription ObjectName to abort, from string: " + objectName); + } + return result; + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyView.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java?rev=966319&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java Wed Jul 21 17:09:33 2010 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import javax.management.ObjectName; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; + +public interface AbortSlowConsumerStrategyViewMBean { + + @MBeanInfo("returns the current max slow count, -1 disables") + long getMaxSlowCount(); + + @MBeanInfo("sets the count after which a slow consumer will be aborted, -1 disables") + void setMaxSlowCount(long maxSlowCount); + + @MBeanInfo("returns the current max slow (milliseconds) duration") + long getMaxSlowDuration(); + + @MBeanInfo("sets the duration (milliseconds) after which a continually slow consumer will be aborted") + void setMaxSlowDuration(long maxSlowDuration); + + @MBeanInfo("returns the check period at which a sweep of consumers is done to determine continued slowness") + public long getCheckPeriod(); + + @MBeanInfo("returns the current list of slow consumers, Not HTML friendly") + TabularData getSlowConsumers() throws OpenDataException; + + @MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer") + void abortConsumer(ObjectName consumerToAbort); + + @MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected") + void abortConnection(ObjectName consumerToAbort); + + @MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer") + void abortConsumer(String objectNameOfConsumerToAbort); + + @MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected") + void abortConnection(String objectNameOfConsumerToAbort); +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AbortSlowConsumerStrategyViewMBean.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Wed Jul 21 17:09:33 2010 @@ -39,6 +39,8 @@ import org.apache.activemq.ActiveMQConne import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; @@ -388,4 +390,13 @@ public class DestinationView implements return answer; } + public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException { + ObjectName result = null; + SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy(); + if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) { + result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy); + } + return result; + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Wed Jul 21 17:09:33 2010 @@ -332,4 +332,13 @@ public interface DestinationViewMBean { @MBeanInfo("returns all the current subscription MBeans matching this destination") ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException; + + /** + * Returns the slow consumer strategy MBean for this destination + * + * @return the name of the slow consumer handler MBean for this destination + */ + @MBeanInfo("returns the optional slowConsumer handler MBeans for this destination") + ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException; + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jul 21 17:09:33 2010 @@ -53,6 +53,8 @@ import org.apache.activemq.broker.region import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -260,10 +262,12 @@ public class ManagedRegionBroker extends } protected void unregisterDestination(ObjectName key) throws Exception { - topics.remove(key); - queues.remove(key); - temporaryQueues.remove(key); - temporaryTopics.remove(key); + + DestinationView view = null; + removeAndRemember(topics, key, view); + removeAndRemember(queues, key, view); + removeAndRemember(temporaryQueues, key, view); + removeAndRemember(temporaryTopics, key, view); if (registeredMBeans.remove(key)) { try { managementContext.unregisterMBean(key); @@ -272,6 +276,24 @@ public class ManagedRegionBroker extends LOG.debug("Failure reason: " + e, e); } } + if (view != null) { + key = view.getSlowConsumerStrategy(); + if (key!= null && registeredMBeans.remove(key)) { + try { + managementContext.unregisterMBean(key); + } catch (Throwable e) { + LOG.warn("Failed to unregister slow consumer strategy MBean: " + key); + LOG.debug("Failure reason: " + e, e); + } + } + } + } + + private void removeAndRemember(Map map, ObjectName key, DestinationView view) { + DestinationView candidate = map.remove(key); + if (candidate != null && view == null) { + view = candidate; + } } protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { @@ -527,4 +549,42 @@ public class ManagedRegionBroker extends + JMXSupport.encodeObjectNamePart(destName.getPhysicalName())); return objectName; } + + public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { + ObjectName objectName = null; + try { + objectName = createObjectName(strategy); + if (!registeredMBeans.contains(objectName)) { + AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy); + AnnotatedMBean.registerMBean(managementContext, view, objectName); + registeredMBeans.add(objectName); + } + } catch (Exception e) { + LOG.warn("Failed to register MBean: " + strategy); + LOG.debug("Failure reason: " + e, e); + } + return objectName; + } + + private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{ + Hashtable map = brokerObjectName.getKeyPropertyList(); + ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName())); + return objectName; + } + + public ObjectName getSubscriberObjectName(Subscription key) { + return subscriptionMap.get(key); + } + + public Subscription getSubscriber(ObjectName key) { + Subscription sub = null; + for (Entry entry: subscriptionMap.entrySet()) { + if (entry.getValue().equals(key)) { + sub = entry.getKey(); + break; + } + } + return sub; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Wed Jul 21 17:09:33 2010 @@ -41,6 +41,9 @@ import javax.management.openmbean.OpenTy import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; + +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.SlowConsumerEntry; import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQMapMessage; @@ -429,7 +432,30 @@ public final class OpenTypeSupport { } } + static class SlowConsumerEntryOpenTypeFactory extends AbstractOpenTypeFactory { + @Override + protected String getTypeName() { + return SlowConsumerEntry.class.getName(); + } + + @Override + protected void init() throws OpenDataException { + super.init(); + addItem("subscription", "the subscription view", SimpleType.OBJECTNAME); + addItem("slowCount", "number of times deemed slow", SimpleType.INTEGER); + addItem("markCount", "number of periods remaining slow", SimpleType.INTEGER); + } + @Override + public Map getFields(Object o) throws OpenDataException { + SlowConsumerEntry entry = (SlowConsumerEntry) o; + Map rc = super.getFields(o); + rc.put("subscription", entry.getSubscription()); + rc.put("slowCount", Integer.valueOf(entry.getSlowCount())); + rc.put("markCount", Integer.valueOf(entry.getMarkCount())); + return rc; + } + } static { OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory()); @@ -439,6 +465,7 @@ public final class OpenTypeSupport { OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory()); OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory()); OPEN_TYPE_FACTORIES.put(Job.class, new JobOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(SlowConsumerEntry.class, new SlowConsumerEntryOpenTypeFactory()); } private OpenTypeSupport() { @@ -448,7 +475,7 @@ public final class OpenTypeSupport { return OPEN_TYPE_FACTORIES.get(clazz); } - public static CompositeData convert(Message message) throws OpenDataException { + public static CompositeData convert(Object message) throws OpenDataException { OpenTypeFactory f = getFactory(message.getClass()); if (f == null) { throw new OpenDataException("Cannot create a CompositeData for type: " + message.getClass().getName()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Jul 21 17:09:33 2010 @@ -602,6 +602,10 @@ public abstract class BaseDestination im this.slowConsumerStrategy = slowConsumerStrategy; } + public SlowConsumerStrategy getSlowConsumerStrategy() { + return this.slowConsumerStrategy; + } + public boolean isPrioritizedMessages() { return this.prioritizedMessages; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Wed Jul 21 17:09:33 2010 @@ -23,6 +23,7 @@ import org.apache.activemq.broker.Connec import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -215,4 +216,6 @@ public interface Destination extends Ser void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception; boolean isPrioritizedMessages(); + + SlowConsumerStrategy getSlowConsumerStrategy(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Wed Jul 21 17:09:33 2010 @@ -23,6 +23,7 @@ import org.apache.activemq.broker.Broker import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -273,4 +274,8 @@ public class DestinationFilter implement public boolean isPrioritizedMessages() { return next.isPrioritizedMessages(); } + + public SlowConsumerStrategy getSlowConsumerStrategy() { + return next.getSlowConsumerStrategy(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jul 21 17:09:33 2010 @@ -407,8 +407,6 @@ public abstract class PrefetchSubscripti * Checks an ack versus the contents of the dispatched list. * * @param ack - * @param firstAckedMsg - * @param lastAckedMsg * @throws JMSException if it does not match */ protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Wed Jul 21 17:09:33 2010 @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.broker.region.policy; import java.util.HashMap; @@ -5,6 +21,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; @@ -23,7 +41,9 @@ public class AbortSlowConsumerStrategy i private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class); + private String name = "AbortSlowConsumerStrategy@" + hashCode(); private Scheduler scheduler; + private Broker broker; private final AtomicBoolean taskStarted = new AtomicBoolean(false); private final Map slowConsumers = new ConcurrentHashMap(); @@ -32,10 +52,11 @@ public class AbortSlowConsumerStrategy i private long checkPeriod = 30*1000; private boolean abortConnection = false; - public void setScheduler(Scheduler s) { - this.scheduler=s; - } - + public void setBrokerService(Broker broker) { + this.scheduler = broker.getScheduler(); + this.broker = broker; + } + public void slowConsumer(ConnectionContext context, Subscription subs) { if (maxSlowCount < 0 && maxSlowDuration < 0) { // nothing to do @@ -75,21 +96,25 @@ public class AbortSlowConsumerStrategy i slowConsumers.remove(entry.getKey()); } } - + + abortSubscription(toAbort, abortConnection); + } + + private void abortSubscription(Map toAbort, boolean abortSubscriberConnection) { for (final Entry entry : toAbort.entrySet()) { ConnectionContext connectionContext = entry.getValue().context; if (connectionContext!= null) { try { - LOG.info("aborting " - + (abortConnection ? "connection" : "consumer") - + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId()); + LOG.info("aborting " + + (abortSubscriberConnection ? "connection" : "consumer") + + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId()); final Connection connection = connectionContext.getConnection(); - if (connection != null) { - if (abortConnection) { + if (connection != null) { + if (abortSubscriberConnection) { scheduler.executeAfterDelay(new Runnable() { public void run() { - connection.serviceException(new InactivityIOException("Consumer was slow too often (>" + connection.serviceException(new InactivityIOException("Consumer was slow too often (>" + maxSlowCount + ") or too long (>" + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId())); }}, 0l); @@ -97,21 +122,36 @@ public class AbortSlowConsumerStrategy i // just abort the consumer by telling it to stop ConsumerControl stopConsumer = new ConsumerControl(); stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId()); - stopConsumer.setClose(true); + stopConsumer.setClose(true); connection.dispatchAsync(stopConsumer); } } else { LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); } } catch (Exception e) { - LOG.info("exception on stopping " - + (abortConnection ? "connection" : "consumer") - + " to abort slow consumer: " + entry.getKey(), e); + LOG.info("exception on stopping " + + (abortSubscriberConnection ? "connection" : "consumer") + + " to abort slow consumer: " + entry.getKey(), e); } } } } - + + + public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) { + if (sub != null) { + SlowConsumerEntry entry = slowConsumers.remove(sub); + if (entry != null) { + Map toAbort = new HashMap(); + toAbort.put(sub, entry); + abortSubscription(toAbort, abortSubscriberConnection); + } else { + LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub); + } + } + } + + public long getMaxSlowCount() { return maxSlowCount; } @@ -120,7 +160,7 @@ public class AbortSlowConsumerStrategy i * number of times a subscription can be deemed slow before triggering abort * effect depends on dispatch rate as slow determination is done on dispatch */ - public void setMaxSlowCount(int maxSlowCount) { + public void setMaxSlowCount(long maxSlowCount) { this.maxSlowCount = maxSlowCount; } @@ -161,22 +201,15 @@ public class AbortSlowConsumerStrategy i this.abortConnection = abortConnection; } - static class SlowConsumerEntry { - - final ConnectionContext context; - int slowCount = 1; - int markCount = 0; - - SlowConsumerEntry(ConnectionContext context) { - this.context = context; - } + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } - public void slow() { - slowCount++; - } - - public void mark() { - markCount++; - } + public Map getSlowConsumers() { + return slowConsumers; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jul 21 17:09:33 2010 @@ -157,7 +157,7 @@ public class PolicyEntry extends Destina destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); SlowConsumerStrategy scs = getSlowConsumerStrategy(); if (scs != null) { - scs.setScheduler(broker.getScheduler()); + scs.setBrokerService(broker); } destination.setSlowConsumerStrategy(scs); destination.setPrioritizedMessages(isPrioritizedMessages()); Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java?rev=966319&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java Wed Jul 21 17:09:33 2010 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.ConnectionContext; + +public class SlowConsumerEntry { + + final ConnectionContext context; + Object subscription; + int slowCount = 1; + int markCount = 0; + + SlowConsumerEntry(ConnectionContext context) { + this.context = context; + } + + public void slow() { + slowCount++; + } + + public void mark() { + markCount++; + } + + public void setSubscription(Object subscriptionObjectName) { + this.subscription = subscriptionObjectName; + } + + public Object getSubscription() { + return subscription; + } + + public int getSlowCount() { + return slowCount; + } + + public int getMarkCount() { + return markCount; + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerEntry.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Wed Jul 21 17:09:33 2010 @@ -1,8 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.thread.Scheduler; /* * a strategy for dealing with slow consumers @@ -10,6 +26,5 @@ import org.apache.activemq.thread.Schedu public interface SlowConsumerStrategy { void slowConsumer(ConnectionContext context, Subscription subs); - void setScheduler(Scheduler scheduler); - + void setBrokerService(Broker broker); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=966319&r1=966318&r2=966319&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed Jul 21 17:09:33 2010 @@ -16,41 +16,45 @@ */ package org.apache.activemq.broker.policy; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; - import junit.framework.Test; - import org.apache.activemq.JmsMultipleClientsTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.MessageIdList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener { private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class); - + AbortSlowConsumerStrategy underTest; - + public boolean abortConnection = false; - public long checkPeriod = 2*1000; - public long maxSlowDuration = 5*1000; + public long checkPeriod = 2 * 1000; + public long maxSlowDuration = 5 * 1000; private List exceptions = new ArrayList(); - + @Override protected void setUp() throws Exception { exceptions.clear(); @@ -59,7 +63,7 @@ public class AbortSlowConsumerTest exten super.setUp(); createDestination(); } - + @Override protected BrokerService createBroker() throws Exception { BrokerService broker = super.createBroker(); @@ -79,7 +83,7 @@ public class AbortSlowConsumerTest exten public void testRegularConsumerIsNotAborted() throws Exception { startConsumers(destination); - for (Connection c: connections) { + for (Connection c : connections) { c.setExceptionListener(this); } startProducers(destination, 100); @@ -90,12 +94,12 @@ public class AbortSlowConsumerTest exten public void initCombosForTestLittleSlowConsumerIsNotAborted() { addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); } - + public void testLittleSlowConsumerIsNotAborted() throws Exception { startConsumers(destination); Entry consumertoAbort = consumers.entrySet().iterator().next(); consumertoAbort.getValue().setProcessingDelay(500); - for (Connection c: connections) { + for (Connection c : connections) { c.setExceptionListener(this); } startProducers(destination, 12); @@ -103,56 +107,102 @@ public class AbortSlowConsumerTest exten allMessagesList.assertAtLeastMessagesReceived(10); } - + public void initCombosForTestSlowConsumerIsAborted() { addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); } - + public void testSlowConsumerIsAborted() throws Exception { startConsumers(destination); Entry consumertoAbort = consumers.entrySet().iterator().next(); - consumertoAbort.getValue().setProcessingDelay(8*1000); - for (Connection c: connections) { + consumertoAbort.getValue().setProcessingDelay(8 * 1000); + for (Connection c : connections) { c.setExceptionListener(this); } startProducers(destination, 100); - + consumertoAbort.getValue().assertMessagesReceived(1); - + TimeUnit.SECONDS.sleep(5); - - consumertoAbort.getValue().assertAtMostMessagesReceived(1); + + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + } + + + public void testSlowConsumerIsAbortedViaJmx() throws Exception { + underTest.setMaxSlowDuration(60*1000); // so jmx does the abort + startConsumers(destination); + Entry consumertoAbort = consumers.entrySet().iterator().next(); + consumertoAbort.getValue().setProcessingDelay(8 * 1000); + for (Connection c : connections) { + c.setExceptionListener(this); + } + startProducers(destination, 100); + + consumertoAbort.getValue().assertMessagesReceived(1); + + ActiveMQDestination amqDest = (ActiveMQDestination)destination; + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" + + (amqDest.isTopic() ? "Topic" : "Queue") +",Destination=" + + amqDest.getPhysicalName() + ",BrokerName=localhost"); + + QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy(); + + assertNotNull(slowConsumerPolicyMBeanName); + + AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean) + broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true); + + TimeUnit.SECONDS.sleep(3); + + TabularData slowOnes = abortPolicy.getSlowConsumers(); + assertEquals("one slow consumers", 1, slowOnes.size()); + + LOG.info("slow ones:" + slowOnes); + + CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next(); + LOG.info("Slow one: " + slowOne); + + assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName); + abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription")); + + consumertoAbort.getValue().assertAtMostMessagesReceived(1); + + slowOnes = abortPolicy.getSlowConsumers(); + assertEquals("no slow consumers left", 0, slowOnes.size()); + } - + public void testOnlyOneSlowConsumerIsAborted() throws Exception { consumerCount = 10; startConsumers(destination); Entry consumertoAbort = consumers.entrySet().iterator().next(); - consumertoAbort.getValue().setProcessingDelay(8*1000); - for (Connection c: connections) { + consumertoAbort.getValue().setProcessingDelay(8 * 1000); + for (Connection c : connections) { c.setExceptionListener(this); } startProducers(destination, 100); - + allMessagesList.waitForMessagesToArrive(99); allMessagesList.assertAtLeastMessagesReceived(99); - + consumertoAbort.getValue().assertMessagesReceived(1); - + TimeUnit.SECONDS.sleep(5); - - consumertoAbort.getValue().assertAtMostMessagesReceived(1); + + consumertoAbort.getValue().assertAtMostMessagesReceived(1); } - + public void testAbortAlreadyClosingConsumers() throws Exception { consumerCount = 1; startConsumers(destination); for (MessageIdList list : consumers.values()) { - list.setProcessingDelay(6*1000); + list.setProcessingDelay(6 * 1000); } - for (Connection c: connections) { + for (Connection c : connections) { c.setExceptionListener(this); } startProducers(destination, 100); @@ -164,12 +214,12 @@ public class AbortSlowConsumerTest exten consumer.close(); } } - + public void initCombosForTestAbortAlreadyClosedConsumers() { addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); } - + public void testAbortAlreadyClosedConsumers() throws Exception { Connection conn = createConnectionFactory().createConnection(); conn.setExceptionListener(this); @@ -182,17 +232,17 @@ public class AbortSlowConsumerTest exten TimeUnit.SECONDS.sleep(1); LOG.info("closing consumer: " + consumer); consumer.close(); - + TimeUnit.SECONDS.sleep(5); assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); } - + public void initCombosForTestAbortAlreadyClosedConnection() { addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); } - + public void testAbortAlreadyClosedConnection() throws Exception { Connection conn = createConnectionFactory().createConnection(); conn.setExceptionListener(this); @@ -204,7 +254,7 @@ public class AbortSlowConsumerTest exten TimeUnit.SECONDS.sleep(1); LOG.info("closing connection: " + conn); conn.close(); - + TimeUnit.SECONDS.sleep(5); assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); } @@ -212,12 +262,12 @@ public class AbortSlowConsumerTest exten public void testAbortConsumerOnDeadConnection() throws Exception { // socket proxy on pause, close could hang?? } - + public void onException(JMSException exception) { exceptions.add(exception); - exception.printStackTrace(); + exception.printStackTrace(); } - + public static Test suite() { return suite(AbortSlowConsumerTest.class); }