activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andreas Gies <andr...@soa-knowledge.net>
Subject Re: Programatically determining number of consumers and pending messages
Date Wed, 22 Apr 2009 06:58:12 GMT
Hi there,

you could use the JMX API for doing that.

A couple of convenient JMX methods are:

     private MBeanInfo getMBeanInfo(String brokerName, String type,  
String pattern) throws Exception {
     	
     	Set<?> mBeans = getMBeans(brokerName, type, pattern);
     	assertEquals(1, mBeans.size());
     	
         return  
getMBeanServerConnection 
(brokerName).getMBeanInfo(getObjectName(brokerName, type, pattern));
     }

     private Set<?> getMBeans(String brokerName, String type) throws  
Exception {
     	return getMBeans(brokerName, type, "*", 0);
     }

     private Set<?> getMBeans(String brokerName, String type, String  
pattern) throws Exception {
         return getMBeans(brokerName, type, pattern, 0);
     }

     private Set<?> getMBeans(String brokerName, String type, String  
pattern, int timeout) throws Exception {
     	
         final long expiryTime = System.currentTimeMillis() + timeout;
         final ObjectName beanName = getObjectName(brokerName, type,  
pattern);

         Set<?> mbeans = null;
         do {
             if (timeout > 0) {
                 Thread.sleep(100);
             }
             MBeanServerConnection mbsc =  
getMBeanServerConnection(brokerName);
             if (mbsc != null) {
                 LOG.info("Query name: " + beanName);
                 mbeans = mbsc.queryMBeans(beanName, null);
             }
         } while ((mbeans == null || mbeans.isEmpty()) && expiryTime >  
System.currentTimeMillis());
         return mbeans;
     }

     private MBeanServerConnection getMBeanServerConnection(String  
brokerName) throws MalformedURLException, Exception {
     	
     	int jmxPort =  
getBrokerService(brokerName).getManagementContext().getConnectorPort();
     	
         final JMXServiceURL url = new  
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + jmxPort + "/ 
jmxrmi");
         MBeanServerConnection mbsc = null;
         try {
             JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
             mbsc = jmxc.getMBeanServerConnection();
         } catch (Exception ignored) {
         }
         return mbsc;
     }

     private Object getAttribute(String brokerName, String type,  
String pattern, String attrName) throws Exception {
         MBeanInfo info = getMBeanInfo(brokerName, type, pattern);
         Object obj =  
getMBeanServerConnection 
(brokerName).getAttribute(getObjectName(brokerName, type, pattern),  
attrName);
         return obj;
     }

     private void queryMBean(String brokerName, String type, String  
pattern) throws Exception {
       ObjectName objName = getObjectName(brokerName, type, pattern);
       MBeanInfo info =  
getMBeanServerConnection(brokerName).getMBeanInfo(objName);

       for(MBeanAttributeInfo attr: info.getAttributes()) {
     	  LOG.info("Found attribute : " + attr.getName());
       }
       for(MBeanOperationInfo op: info.getOperations()) {
     	  LOG.info("Found operation : " + op.getName());
       }

        
getMBeanServerConnection 
(brokerName).getAttribute(getObjectName(brokerName, type, pattern),  
"ConsumerCount");
     }

     private ObjectName getObjectName(String brokerName, String type,  
String pattern) throws Exception {
       ObjectName beanName = new ObjectName(
         "org.apache.activemq:BrokerName=" + brokerName + ",Type=" +  
type +"," + pattern
       );

       return beanName;
     }

Probably you would just need code to get the MBeanServer connection  
and query the attribute. Perhaps you have to tweak the methods
to fulfill your needs; I have just taken them from a test case.

An example to query the number of consumers:

     consumers_A = getAttribute(brokerAName, "Queue", "Destination=" +  
QUEUE_NAME, "ConsumerCount");

Hope that helps

Andreas

On Apr 22, 2009, at 5:10 AM, BigPic wrote:

>
> I have a setup with one fast producer and a variable number of slow
> consumers.  I would like the producer, on startup, to adjust itself  
> to the
> number of consumers and the number of currently pending messages.
>
> I know that I can use the  http://activemq.apache.org/advisory-message.html
> Advisory Messages  to tell when consumers start and stop listening,  
> but
> short of becoming a fake consumer, I haven't found a way for the  
> producer to
> tell on startup what the current situation is.  It seems the  
> "Command Agent"
> might help, but I can't find any documentation for it.
>
> Does anyone have any recommendations?
>
> Thanks.
>
>
> -- 
> View this message in context: http://www.nabble.com/Programatically-determining-number-of-consumers-and-pending-messages-tp23168916p23168916.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message