activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "martin@attivio.com" <mar...@attivio.com>
Subject Re: Inspecting queue depth with ActiveMQ API vs JMX
Date Thu, 24 Mar 2011 19:07:02 GMT
To be honest I've had a tough time doing this with the API.  Our goals may be
a bit different from yours, we are trying to determine how many messages are
on the queue that have not been picked up by a consumer.  One of the issues
is dealing with prefetch sizes for different consumers and taking that into
account.  We have not found a simple queue size api call.  We use version
5.3.2.

But here is the code I have been using:

  /** Calculate queue the size.  A complication in determining the queue
size is that the jms broker
   * will preemptively dispatch messages to registered consumers.  The
number of messages sent in
   * such a manner is equal to the prefetch size of the consumer.  The
"inflight" statistic for the
   * queue will include all prefetched messages and all messages which have
been received but are as
   * yet unacknowledged.  The "messages" statistic is all messages which are
actually still held by
   * the queue.  For SEDA purposes we want to limit the number of queued
messages when they are not
   * being pulled of by active processing components.  So we compute the
queue size as:
   *
   *  "messages" - ("inflight" - "prefetched")
   *
   *  This results in the real size of the queue being the number of
unconsumed messages + (num consumers * prefetch size)
   *  */
  @Override
  public int getQueueSize(String endpoint) {
    try {
      Map&lt;ActiveMQDestination, Destination&gt; dmap =
_broker.getBroker().getDestinationMap();
      ActiveMQQueue dstQueue = new ActiveMQQueue(endpoint);
      Destination dst = dmap.get(dstQueue);
      DestinationStatistics stats = dst.getDestinationStatistics();
      int currentPrefetch = 0;
      for (Subscription sub : dst.getConsumers()) {
        currentPrefetch += sub.getConsumerInfo().getCurrentPrefetchSize();
      }
      if (log.isTraceEnabled())
        log.trace("msgs:%d, inflight:%d, cached:%d, enqueues:%d,
dequeues:%d, consumerCnt:%d, curPrefetch:%d", 
            stats.getMessages().getCount(), stats.getInflight().getCount(),
stats.getMessagesCached().getCount(), stats.getEnqueues().getCount(),
stats.getDequeues().getCount(),
            dst.getConsumers().size(), currentPrefetch);
      int qsize = (int)
(stats.getMessages().getCount()-stats.getInflight().getCount());
      if (qsize > currentPrefetch) qsize -= currentPrefetch;
      return qsize;
    } catch (Exception e) {
      log.ignoreException(e);
    }
    return 0;
  }



--
View this message in context: http://activemq.2283324.n4.nabble.com/Inspecting-queue-depth-with-ActiveMQ-API-vs-JMX-tp3399573p3403464.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message