activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1296720 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ test/java/org/apache/activemq/broker/jmx/
Date Sat, 03 Mar 2012 23:30:10 GMT
Author: tabish
Date: Sat Mar  3 23:30:09 2012
New Revision: 1296720

URL: http://svn.apache.org/viewvc?rev=1296720&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-1905

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java?rev=1296720&r1=1296719&r2=1296720&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
Sat Mar  3 23:30:09 2012
@@ -16,21 +16,36 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.io.IOException;
+import java.util.Set;
+
+import javax.management.ObjectName;
+
 import org.apache.activemq.broker.Connection;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.JMXSupport;
 
 public class ConnectionView implements ConnectionViewMBean {
 
     private final Connection connection;
+    private final ManagementContext managementContext;
     private String userName;
 
     public ConnectionView(Connection connection) {
+        this(connection, null);
+    }
+
+    public ConnectionView(Connection connection, ManagementContext managementContext) {
         this.connection = connection;
+        this.managementContext = managementContext;
     }
 
+    @Override
     public void start() throws Exception {
         connection.start();
     }
 
+    @Override
     public void stop() throws Exception {
         connection.stop();
     }
@@ -38,6 +53,7 @@ public class ConnectionView implements C
     /**
      * @return true if the Connection is slow
      */
+    @Override
     public boolean isSlow() {
         return connection.isSlow();
     }
@@ -45,6 +61,7 @@ public class ConnectionView implements C
     /**
      * @return if after being marked, the Connection is still writing
      */
+    @Override
     public boolean isBlocked() {
         return connection.isBlocked();
     }
@@ -52,6 +69,7 @@ public class ConnectionView implements C
     /**
      * @return true if the Connection is connected
      */
+    @Override
     public boolean isConnected() {
         return connection.isConnected();
     }
@@ -59,10 +77,12 @@ public class ConnectionView implements C
     /**
      * @return true if the Connection is active
      */
+    @Override
     public boolean isActive() {
         return connection.isActive();
     }
 
+    @Override
     public int getDispatchQueueSize() {
         return connection.getDispatchQueueSize();
     }
@@ -70,10 +90,12 @@ public class ConnectionView implements C
     /**
      * Resets the statistics
      */
+    @Override
     public void resetStatistics() {
         connection.getStatistics().reset();
     }
 
+    @Override
     public String getRemoteAddress() {
         return connection.getRemoteAddress();
     }
@@ -90,4 +112,62 @@ public class ConnectionView implements C
     public void setUserName(String userName) {
         this.userName = userName;
     }
+
+    @Override
+    public ObjectName[] getConsumers() {
+        ObjectName[] result = null;
+
+        if (connection != null && managementContext != null) {
+
+            try {
+                ObjectName query = createConsumerQueury(connection.getConnectionId());
+                Set<ObjectName> names = managementContext.queryNames(query, null);
+                result = names.toArray(new ObjectName[0]);
+            } catch (Exception e) {
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public ObjectName[] getProducers() {
+        ObjectName[] result = null;
+
+        if (connection != null && managementContext != null) {
+
+            try {
+                ObjectName query = createProducerQueury(connection.getConnectionId());
+                Set<ObjectName> names = managementContext.queryNames(query, null);
+                result = names.toArray(new ObjectName[0]);
+            } catch (Exception e) {
+            }
+        }
+
+        return result;
+    }
+
+    private ObjectName createConsumerQueury(String clientId) throws IOException {
+        try {
+            return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=*,"
+                                  + "Type=Subscription,persistentMode=*,"
+                                  + "destinationType=*,destinationName=*,"
+                                  + "clientId=" + JMXSupport.encodeObjectNamePart(clientId)
+ ","
+                                  + "consumerId=*");
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    private ObjectName createProducerQueury(String clientId) throws IOException {
+        try {
+            return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=*,"
+                                  + "Type=Producer,"
+                                  + "destinationType=*,destinationName=*,"
+                                  + "clientId=" + JMXSupport.encodeObjectNamePart(clientId)
+ ","
+                                  + "producerId=*");
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java?rev=1296720&r1=1296719&r2=1296720&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
Sat Mar  3 23:30:09 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.management.ObjectName;
+
 import org.apache.activemq.Service;
 
 public interface ConnectionViewMBean extends Service {
@@ -72,4 +74,21 @@ public interface ConnectionViewMBean ext
      */
     @MBeanInfo("User Name used to authorize creation of this connection")
     String getUserName();
+
+    /**
+     * Returns the ObjectNames of all the Consumers created by this Connection.
+     *
+     * @return the ObjectNames of all Consumers created by this Connection.
+     */
+    @MBeanInfo("The ObjectNames of all Consumers created by this Connection")
+    ObjectName[] getConsumers();
+
+    /**
+     * Returns the ObjectNames of all the Producers created by this Connection.
+     *
+     * @return the ObjectNames of all Producers created by this Connection.
+     */
+    @MBeanInfo("The ObjectNames of all Producers created by this Connection")
+    ObjectName[] getProducers();
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?rev=1296720&r1=1296719&r2=1296720&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
Sat Mar  3 23:30:09 2012
@@ -54,7 +54,7 @@ public class ManagedTransportConnection 
         super(connector, transport, broker, factory);
         this.managementContext = context;
         this.connectorName = connectorName;
-        this.mbean = new ConnectionView(this);
+        this.mbean = new ConnectionView(this, managementContext);
         this.populateUserName = broker.getBrokerService().isPopulateUserNameInMBeans();
         if (managementContext.isAllowRemoteAddressInMBeanNames()) {
             byAddressName = createByAddressObjectName("address", transport.getRemoteAddress());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1296720&r1=1296719&r2=1296720&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Sat Mar  3 23:30:09 2012
@@ -900,8 +900,8 @@ public class MBeanTest extends EmbeddedB
         connection.setClientID("MBeanTest");
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination queue = session.createQueue(getDestinationString() + ".Queue");
-        @SuppressWarnings("unused")
         MessageConsumer queueConsumer = session.createConsumer(queue);
+        MessageProducer producer = session.createProducer(queue);
 
         ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
         BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
@@ -911,6 +911,7 @@ public class MBeanTest extends EmbeddedB
         assertTrue(broker.getQueueSubscribers().length == 1);
 
         ObjectName subscriptionName = broker.getQueueSubscribers()[0];
+        LOG.info("Looking for Subscription: " + subscriptionName);
 
         SubscriptionViewMBean subscriberView =
             (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(
@@ -918,11 +919,37 @@ public class MBeanTest extends EmbeddedB
         assertNotNull(subscriberView);
 
         ObjectName connectionName = subscriberView.getConnection();
+        LOG.info("Looking for Connection: " + connectionName);
         assertNotNull(connectionName);
         ConnectionViewMBean connectionView =
             (ConnectionViewMBean)MBeanServerInvocationHandler.newProxyInstance(
                     mbeanServer, connectionName, ConnectionViewMBean.class, true);
         assertNotNull(connectionView);
+
+        // Our consumer plus one advisory consumer.
+        assertEquals(2, connectionView.getConsumers().length);
+
+        // Check that the subscription view we found earlier is in this list.
+        boolean found = false;
+        for (ObjectName name : connectionView.getConsumers()) {
+            if (name.equals(subscriptionName)) {
+                found = true;
+            }
+        }
+        assertTrue("We should have found: " + subscriptionName, found);
+
+        // Our producer and no others.
+        assertEquals(1, connectionView.getProducers().length);
+
+        // Bean should detect the updates.
+        queueConsumer.close();
+        producer.close();
+
+        Thread.sleep(200);
+
+        // Only an advisory consumers now.
+        assertEquals(1, connectionView.getConsumers().length);
+        assertEquals(0, connectionView.getProducers().length);
     }
 
     public void testUserNamePopulated() throws Exception {



Mime
View raw message