activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4754
Date Tue, 28 Jan 2014 19:07:47 GMT
Updated Branches:
  refs/heads/trunk 713250f5f -> fde22a849


https://issues.apache.org/jira/browse/AMQ-4754

Add connection counts to Broker mbean

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fde22a84
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fde22a84
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fde22a84

Branch: refs/heads/trunk
Commit: fde22a8496a160fc746c6f193dea3f29d113c6e3
Parents: 713250f
Author: Timothy Bish <tabish121@gmai.com>
Authored: Tue Jan 28 14:07:39 2014 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Tue Jan 28 14:07:39 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  57 +++++++++-
 .../apache/activemq/broker/jmx/BrokerView.java  |  15 ++-
 .../activemq/broker/jmx/BrokerViewMBean.java    |  16 ++-
 .../broker/jmx/ManagedRegionBroker.java         |  15 ++-
 .../apache/activemq/broker/jmx/MBeanTest.java   | 114 ++++++++++++-------
 5 files changed, 170 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fde22a84/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index fe6f59b..3c542ce 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -45,6 +45,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -56,7 +58,21 @@ import org.apache.activemq.ConfigurationException;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.*;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.ConnectorView;
+import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.HealthView;
+import org.apache.activemq.broker.jmx.HealthViewMBean;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.JobSchedulerView;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.NetworkConnectorView;
+import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
+import org.apache.activemq.broker.jmx.ProxyConnectorView;
 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
@@ -94,7 +110,16 @@ import org.apache.activemq.transport.TransportFactorySupport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -206,6 +231,8 @@ public class BrokerService implements Service {
     private boolean networkConnectorStartAsync = false;
     private boolean allowTempAutoCreationOnSend;
     private JobSchedulerStore jobSchedulerStore;
+    private final AtomicLong totalConnections = new AtomicLong();
+    private final AtomicInteger currentConnections = new AtomicInteger();
 
     private long offlineDurableSubscriberTimeout = -1;
     private long offlineDurableSubscriberTaskSchedule = 300000;
@@ -2941,4 +2968,30 @@ public class BrokerService implements Service {
     public void setStoreOpenWireVersion(int storeOpenWireVersion) {
         this.storeOpenWireVersion = storeOpenWireVersion;
     }
+
+    /**
+     * @return the current number of connections on this Broker.
+     */
+    public int getCurrentConnections() {
+        return this.currentConnections.get();
+    }
+
+    /**
+     * @return the total number of connections this broker has handled since startup.
+     */
+    public long getTotalConnections() {
+        return this.totalConnections.get();
+    }
+
+    public void incrementCurrentConnections() {
+        this.currentConnections.incrementAndGet();
+    }
+
+    public void decrementCurrentConnections() {
+        this.currentConnections.decrementAndGet();
+    }
+
+    public void incrementTotalConnections() {
+        this.totalConnections.incrementAndGet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fde22a84/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
index 5bad48b..f0c92c3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
@@ -89,6 +89,16 @@ public class BrokerView implements BrokerViewMBean {
     }
 
     @Override
+    public int getCurrentConnectionsCount() {
+        return brokerService.getCurrentConnections();
+    }
+
+    @Override
+    public long getTotalConnectionsCount() {
+        return brokerService.getTotalConnections();
+    }
+
+    @Override
     public void gc() throws Exception {
         brokerService.getBroker().gc();
         try {
@@ -148,6 +158,7 @@ public class BrokerView implements BrokerViewMBean {
     /**
      * @return the average size of a message (bytes)
      */
+    @Override
     public double getAverageMessageSize() {
         return safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize();
     }
@@ -155,6 +166,7 @@ public class BrokerView implements BrokerViewMBean {
     /**
      * @return the max size of a message (bytes)
      */
+    @Override
     public long getMaxMessageSize() {
         return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize();
     }
@@ -162,11 +174,11 @@ public class BrokerView implements BrokerViewMBean {
     /**
      * @return the min size of a message (bytes)
      */
+    @Override
     public long getMinMessageSize() {
         return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize();
     }
 
-
     public long getTotalMessagesCached() {
         return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
     }
@@ -431,7 +443,6 @@ public class BrokerView implements BrokerViewMBean {
         brokerService.getBroker().removeSubscription(context, info);
     }
 
-    //  doc comment inherited from BrokerViewMBean
     @Override
     public void reloadLog4jProperties() throws Throwable {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/fde22a84/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
index c787c5a..f3fc378 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.jmx;
 import java.util.Map;
 
 import javax.management.ObjectName;
+
 import org.apache.activemq.Service;
 
 
@@ -53,6 +54,16 @@ public interface BrokerViewMBean extends Service {
     String getUptime();
 
     /**
+     * @return The current number of active connections on this Broker.
+     */
+    int getCurrentConnectionsCount();
+
+    /**
+     * @return The total number of connections serviced since this Broker was started.
+     */
+    long getTotalConnectionsCount();
+
+    /**
      * The Broker will flush it's caches so that the garbage collector can
      * reclaim more memory.
      *
@@ -88,7 +99,6 @@ public interface BrokerViewMBean extends Service {
     @MBeanInfo("Number of unacknowledged messages on the broker.")
     long getTotalMessageCount();
 
-
     @MBeanInfo("Average message size on this broker")
     double getAverageMessageSize();
 
@@ -121,7 +131,7 @@ public interface BrokerViewMBean extends Service {
     long getTempLimit();
 
     void setTempLimit(@MBeanInfo("bytes") long limit);
-    
+
     @MBeanInfo("Percent of job store limit used.")
     int getJobSchedulerStorePercentUsage();
 
@@ -148,6 +158,7 @@ public interface BrokerViewMBean extends Service {
     /**
      * Stop the broker and all it's components.
      */
+    @Override
     @MBeanInfo("Stop the broker and all its components.")
     void stop() throws Exception;
 
@@ -324,5 +335,4 @@ public interface BrokerViewMBean extends Service {
 
     @MBeanInfo("JMSJobScheduler")
     ObjectName getJMSJobScheduler();
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fde22a84/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index bfbcfe6..0770bd1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -61,6 +61,7 @@ import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
@@ -226,6 +227,19 @@ public class ManagedRegionBroker extends RegionBroker {
     }
 
     @Override
+    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{
+        super.addConnection(context, info);
+        this.contextBroker.getBrokerService().incrementCurrentConnections();
+        this.contextBroker.getBrokerService().incrementTotalConnections();
+    }
+
+    @Override
+    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
error) throws Exception {
+        super.removeConnection(context, info, error);
+        this.contextBroker.getBrokerService().decrementCurrentConnections();
+    }
+
+    @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
         Subscription sub = super.addConsumer(context, info);
         SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(),
sub.getConsumerInfo().getSubscriptionName());
@@ -440,7 +454,6 @@ public class ManagedRegionBroker extends RegionBroker {
             LOG.warn("Failed to register MBean {}", key);
             LOG.debug("Failure reason: ", e);
         }
-
     }
 
     protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/fde22a84/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index 1327dd2..a853b3e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -24,11 +24,26 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.*;
-import javax.management.*;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
+
 import junit.textui.TestRunner;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
@@ -78,7 +93,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
     public void testConnectors() throws Exception{
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         assertEquals("openwire URL port doesn't equal bind Address",
                      new URI(broker.getTransportConnectorByType("tcp")).getPort(),
@@ -105,7 +120,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         CompositeData[] compdatalist = queue.browse();
         int initialQueueSize = compdatalist.length;
@@ -143,7 +158,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         echo("Now browsing the second queue");
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ newDestination );
-        QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         long newQueuesize = queueNew.getQueueSize();
         echo("Second queue size: " + newQueuesize);
@@ -223,7 +238,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         useConnection(connection);
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         long initialQueueSize = queue.getQueueSize();
         echo("current queue size: " + initialQueueSize);
@@ -245,7 +260,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         Thread.sleep(1000);
 
         ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
-        QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
dlqQueueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName,
QueueViewMBean.class, true);
 
         long initialDlqSize = dlq.getQueueSize();
         CompositeData[] compdatalist = dlq.browse();
@@ -298,14 +313,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString() );
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         String newDestination = getSecondDestinationString();
         queue.moveMatchingMessagesTo("counter > 2", newDestination);
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ newDestination);
 
-        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
         int movedSize = MESSAGE_COUNT-3;
         assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
 
@@ -382,19 +397,18 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertEquals("topic3 Durable subscriber count", 1, topic3.getConsumerCount());
     }
 
-    @SuppressWarnings("rawtypes")
     protected void assertSendViaMBean() throws Exception {
         String queueName = getDestinationString() + ".SendMBBean";
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
         echo("Create QueueView MBean...");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
         broker.addQueue(queueName);
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ queueName);
 
         echo("Create QueueView MBean...");
-        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         proxy.purge();
 
@@ -422,6 +436,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         browseAndVerifyTypes(proxy, false);
     }
 
+    @SuppressWarnings("rawtypes")
     private void browseAndVerifyTypes(QueueViewMBean proxy, boolean allStrings) throws Exception
{
         CompositeData[] compdatalist = proxy.browse();
         if (compdatalist.length == 0) {
@@ -479,13 +494,13 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
         echo("Create QueueView MBean...");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
         broker.addQueue(queueName);
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ queueName);
 
         echo("Create QueueView MBean...");
-        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         proxy.purge();
 
@@ -520,7 +535,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
         echo("Create QueueView MBean...");
-        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         long concount = proxy.getConsumerCount();
         echo("Consumer Count :" + concount);
@@ -570,7 +585,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         // lets create a new topic
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
         echo("Create QueueView MBean...");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         broker.addTopic(getDestinationString());
 
@@ -593,7 +608,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
     protected void assertConsumerCounts() throws Exception {
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         assertTrue("broker is not a slave", !broker.isSlave());
         // create 2 topics
@@ -602,8 +617,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName="
+ getDestinationString() + "1");
         ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName="
+ getDestinationString() + "2");
-        TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName1, TopicViewMBean.class, true);
-        TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName2, TopicViewMBean.class, true);
+        TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName1, TopicViewMBean.class, true);
+        TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName2, TopicViewMBean.class, true);
 
         assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
         assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
@@ -645,7 +660,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
     protected void assertProducerCounts() throws Exception {
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         assertTrue("broker is not a slave", !broker.isSlave());
         // create 2 topics
@@ -654,8 +669,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName="
+ getDestinationString() + "1");
         ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName="
+ getDestinationString() + "2");
-        TopicViewMBean topic1 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName1, TopicViewMBean.class, true);
-        TopicViewMBean topic2 = (TopicViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName2, TopicViewMBean.class, true);
+        TopicViewMBean topic1 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName1, TopicViewMBean.class, true);
+        TopicViewMBean topic2 = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
topicObjName2, TopicViewMBean.class, true);
 
         assertEquals("topic1 Producer count", 0, topic1.getProducerCount());
         assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
@@ -737,6 +752,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         return objectName;
     }
 
+    @Override
     protected void setUp() throws Exception {
         bindAddress = "tcp://localhost:0";
         useTopic = false;
@@ -745,6 +761,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         mbeanServer = managementContext.getMBeanServer();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         if (waitForKeyPress) {
             // We are running from the command line so let folks browse the
@@ -768,6 +785,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
     }
 
+    @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService answer = new BrokerService();
         answer.setPersistent(false);
@@ -855,7 +873,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         connection = connectionFactory.createConnection();
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         assertEquals(0, broker.getDynamicDestinationProducers().length);
 
@@ -873,7 +891,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName viewName = broker.getDynamicDestinationProducers()[0];
         assertNotNull(viewName);
-        ProducerViewMBean view = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
viewName, ProducerViewMBean.class, true);
+        ProducerViewMBean view = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
viewName, ProducerViewMBean.class, true);
         assertNotNull(view);
 
         assertEquals("NOTSET", view.getDestinationName());
@@ -940,7 +958,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         CompositeData[] compdatalist = queue.browse();
         int initialQueueSize = compdatalist.length;
@@ -966,7 +984,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
     public void testDestinationOptionsAreVisible() throws Exception {
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ QUEUE_WITH_OPTIONS );
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         assertEquals("name match", QUEUE_WITH_OPTIONS, queue.getName());
 
@@ -992,7 +1010,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         MessageProducer producer = session.createProducer(queue);
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         Thread.sleep(100);
 
@@ -1053,7 +1071,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
         echo("Create QueueView MBean...");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
         assertEquals("Durable subscriber count", 0, broker.getInactiveDurableTopicSubscribers().length);
@@ -1108,7 +1126,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         MessageConsumer durable = session.createDurableSubscriber(topic, "Durable");
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
-        BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
         Thread.sleep(100);
 
@@ -1117,7 +1135,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertTrue(broker.getQueueSubscribers().length == 1);
 
         ObjectName producerName = broker.getQueueProducers()[0];
-        ProducerViewMBean producerView = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
producerName, ProducerViewMBean.class, true);
+        ProducerViewMBean producerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
producerName, ProducerViewMBean.class, true);
         assertNotNull(producerView);
 
         if (expect) {
@@ -1127,7 +1145,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         }
 
         for (ObjectName name : broker.getTopicSubscribers()) {
-            SubscriptionViewMBean subscriberView = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, SubscriptionViewMBean.class, true);
+            SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, SubscriptionViewMBean.class, true);
             if (expect) {
                 assertEquals("admin", subscriberView.getUserName());
             } else {
@@ -1136,7 +1154,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         }
 
         for (ObjectName name : broker.getQueueSubscribers()) {
-            SubscriptionViewMBean subscriberView = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, SubscriptionViewMBean.class, true);
+            SubscriptionViewMBean subscriberView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, SubscriptionViewMBean.class, true);
             if (expect) {
                 assertEquals("admin", subscriberView.getUserName());
             } else {
@@ -1152,7 +1170,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
             if (name.toString().endsWith("connectionName=MBeanTest")) {
 
                 ConnectionViewMBean connectionView =
-                    (ConnectionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, ConnectionViewMBean.class, true);
+                    MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, ConnectionViewMBean.class,
true);
                 assertNotNull(connectionView);
 
                 if (expect) {
@@ -1175,14 +1193,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         String newDestination = getSecondDestinationString();
         queue.moveMatchingMessagesTo("", newDestination);
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ newDestination);
 
-        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
         int movedSize = MESSAGE_COUNT;
         assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
 
@@ -1208,20 +1226,36 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
     }
 
+    public void testConnectionCounts() throws Exception {
+
+        ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
+        BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
+
+        assertEquals(0, broker.getCurrentConnectionsCount());
+
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+
+        assertEquals(1, broker.getCurrentConnectionsCount());
+        connection.close();
+        assertEquals(0, broker.getCurrentConnectionsCount());
+        assertEquals(1, broker.getTotalConnectionsCount());
+    }
+
     public void testCopyMessagesToRetainOrder() throws Exception {
         connection = connectionFactory.createConnection();
         useConnection(connection);
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         String newDestination = getSecondDestinationString();
         queue.copyMatchingMessagesTo("", newDestination);
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ newDestination );
 
-        queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
         int movedSize = MESSAGE_COUNT;
         assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
 
@@ -1253,7 +1287,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         String queueName = getDestinationString();
         queue.removeMatchingMessages("counter < 10");
@@ -1289,7 +1323,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
+ getDestinationString());
 
-        QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         CompositeData[] compdatalist = queue.browse();
         int initialQueueSize = compdatalist.length;
@@ -1335,6 +1369,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,endpoint=dynamicProducer,*");
         Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
         assertEquals(mbeans.size(), 1);
+        producer.close();
     }
 
     public void testDurableSubQuery() throws Exception  {
@@ -1346,5 +1381,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic,endpoint=Consumer,consumerId=Durable(*),*");
         Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
         assertEquals(mbeans.size(), 1);
+        sub.close();
     }
 }


Mime
View raw message