Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2C363E600 for ; Wed, 2 Jan 2013 21:10:01 +0000 (UTC) Received: (qmail 85057 invoked by uid 500); 2 Jan 2013 21:10:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 84990 invoked by uid 500); 2 Jan 2013 21:10:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 84983 invoked by uid 99); 2 Jan 2013 21:10:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jan 2013 21:10:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jan 2013 21:09:57 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 19F5A2388980; Wed, 2 Jan 2013 21:09:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1428051 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Date: Wed, 02 Jan 2013 21:09:36 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130102210937.19F5A2388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed Jan 2 21:09:36 2013 New Revision: 1428051 URL: http://svn.apache.org/viewvc?rev=1428051&view=rev Log: code fix and test fix for: https://issues.apache.org/jira/browse/AMQ-4225 AbortSlowConsumerStrategy was broken due to a malformed ObjectName being created when the strategy was registered on-demand. Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1428051&r1=1428050&r2=1428051&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jan 2 21:09:36 2013 @@ -19,7 +19,6 @@ package org.apache.activemq.broker.jmx; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -40,12 +39,23 @@ import javax.management.openmbean.OpenDa import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; -import org.apache.activemq.broker.region.*; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFactory; +import org.apache.activemq.broker.region.DestinationFactoryImpl; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Region; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -568,19 +578,23 @@ public class ManagedRegionBroker extends ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); TopicMessageStore store = adapter.createTopicMessageStore(topic); store.recover(new MessageRecoveryListener() { + @Override public boolean recoverMessage(Message message) throws Exception { result.add(message); return true; } + @Override public boolean recoverMessageReference(MessageId messageReference) throws Exception { throw new RuntimeException("Should not be called."); } + @Override public boolean hasSpace() { return true; } + @Override public boolean isDuplicate(MessageId id) { return false; } @@ -760,8 +774,7 @@ public class ManagedRegionBroker extends private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{ String objectNameStr = this.brokerObjectName.toString(); - objectNameStr += "Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName()); - Hashtable map = brokerObjectName.getKeyPropertyList(); + objectNameStr += ",Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName()); ObjectName objectName = new ObjectName(objectNameStr); return objectName; } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java?rev=1428051&r1=1428050&r2=1428051&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java Wed Jan 2 21:09:36 2013 @@ -16,12 +16,28 @@ */ package org.apache.activemq.broker.policy; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + import junit.framework.Test; + import org.apache.activemq.JmsMultipleClientsTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; -import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -30,20 +46,6 @@ import org.apache.activemq.util.MessageI import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener { @@ -55,7 +57,7 @@ public class AbortSlowConsumerTest exten public long checkPeriod = 2 * 1000; public long maxSlowDuration = 5 * 1000; - private List exceptions = new ArrayList(); + private final List exceptions = new ArrayList(); @Override protected void setUp() throws Exception { @@ -109,7 +111,6 @@ public class AbortSlowConsumerTest exten allMessagesList.assertAtLeastMessagesReceived(10); } - public void initCombosForTestSlowConsumerIsAborted() { addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); @@ -125,13 +126,10 @@ public class AbortSlowConsumerTest exten startProducers(destination, 100); consumertoAbort.getValue().assertMessagesReceived(1); - TimeUnit.SECONDS.sleep(5); - consumertoAbort.getValue().assertAtMostMessagesReceived(1); } - public void testSlowConsumerIsAbortedViaJmx() throws Exception { underTest.setMaxSlowDuration(60*1000); // so jmx does the abort startConsumers(destination); @@ -145,11 +143,11 @@ public class AbortSlowConsumerTest exten consumertoAbort.getValue().assertMessagesReceived(1); ActiveMQDestination amqDest = (ActiveMQDestination)destination; - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" + - (amqDest.isTopic() ? "Topic" : "Queue") +",Destination=" - + amqDest.getPhysicalName() + ",BrokerName=localhost"); + ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + + (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName=" + + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost"); - QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true); ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy(); assertNotNull(slowConsumerPolicyMBeanName); @@ -185,10 +183,8 @@ public class AbortSlowConsumerTest exten assertTrue("correct exception: " + expected.getCause(), expected.getCause() instanceof InstanceNotFoundException); } - } - public void testOnlyOneSlowConsumerIsAborted() throws Exception { consumerCount = 10; startConsumers(destination); @@ -203,9 +199,7 @@ public class AbortSlowConsumerTest exten allMessagesList.assertAtLeastMessagesReceived(99); consumertoAbort.getValue().assertMessagesReceived(1); - TimeUnit.SECONDS.sleep(5); - consumertoAbort.getValue().assertAtMostMessagesReceived(1); } @@ -276,6 +270,7 @@ public class AbortSlowConsumerTest exten // socket proxy on pause, close could hang?? } + @Override public void onException(JMSException exception) { exceptions.add(exception); exception.printStackTrace();