activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1162140 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/ apollo-web...
Date Fri, 26 Aug 2011 15:18:54 GMT
Author: chirino
Date: Fri Aug 26 15:18:53 2011
New Revision: 1162140

URL: http://svn.apache.org/viewvc?rev=1162140&view=rev
Log:
- Realized that even Topics can own Queue so simplified Topic/Queue Metrics down to just a
DestMetricsDTO
-

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java
      - copied, changed from r1162139, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java
      - copied, changed from r1162139, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
Removed:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Fri Aug 26 15:18:53 2011
@@ -29,6 +29,48 @@ import java.util.{Arrays, ArrayList}
 import collection.mutable.{LinkedHashMap, HashMap}
 import collection.{Iterable, JavaConversions}
 
+object DestinationMetricsSupport {
+
+  def add_destination_metrics(to:DestMetricsDTO, from:DestMetricsDTO) = {
+    to.enqueue_item_counter += from.enqueue_item_counter
+    to.enqueue_size_counter += from.enqueue_size_counter
+    to.enqueue_ts = to.enqueue_ts max from.enqueue_ts
+
+    to.dequeue_item_counter += from.dequeue_item_counter
+    to.dequeue_size_counter += from.dequeue_size_counter
+    to.dequeue_ts = to.dequeue_ts max from.dequeue_ts
+
+    to.producer_counter += from.producer_counter
+    to.consumer_counter += from.consumer_counter
+    to.producer_count += from.producer_count
+    to.consumer_count += from.consumer_count
+
+    to.nack_item_counter += from.nack_item_counter
+    to.nack_size_counter += from.nack_size_counter
+    to.nack_ts = to.nack_ts max from.nack_ts
+
+    to.expired_item_counter += from.expired_item_counter
+    to.expired_size_counter += from.expired_size_counter
+    to.expired_ts = to.expired_ts max from.expired_ts
+
+    to.queue_size += from.queue_size
+    to.queue_items += from.queue_items
+
+    to.swap_out_item_counter += from.swap_out_item_counter
+    to.swap_out_size_counter += from.swap_out_size_counter
+    to.swap_in_item_counter += from.swap_in_item_counter
+    to.swap_in_size_counter += from.swap_in_size_counter
+
+    to.swapping_in_size += from.swapping_in_size
+    to.swapping_out_size += from.swapping_out_size
+
+    to.swapped_in_items += from.swapped_in_items
+    to.swapped_in_size += from.swapped_in_size
+    to.swapped_in_size_max += from.swapped_in_size_max
+  }
+
+}
+
 /**
  * <p>
  * </p>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug 26 15:18:53 2011
@@ -207,8 +207,8 @@ class Queue(val router: LocalRouter, val
   var auto_delete_after = 0
   var idled_at = 0L
 
-  def get_queue_metrics:QueueMetricsDTO = {
-    val rc = new QueueMetricsDTO
+  def get_queue_metrics:DestMetricsDTO = {
+    val rc = new DestMetricsDTO
 
     rc.enqueue_item_counter = this.enqueue_item_counter
     rc.enqueue_size_counter = this.enqueue_size_counter
@@ -1827,7 +1827,7 @@ class Subscription(val queue:Queue, val 
 
   def refill_prefetch = {
 
-    var next = if( pos.is_tail ) {
+    var cursor = if( pos.is_tail ) {
       null // can't prefetch the tail..
     } else if( pos.is_head ) {
       pos.getNext // can't prefetch the head.
@@ -1836,18 +1836,19 @@ class Subscription(val queue:Queue, val 
     }
 
     var remaining = queue.tune_consumer_buffer - acquired_size
-    while( remaining>0 && next!=null ) {
-      remaining -= next.size
-      next.prefetch_flags = (next.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
-      next.load
-      next = next.getNext
+    while( remaining>0 && cursor!=null ) {
+      val next = cursor.getNext
+      remaining -= cursor.size
+      cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
+      cursor.load
+      cursor = next
     }
 
     remaining = avg_advanced_size
-    while( remaining>0 && next!=null ) {
-      remaining -= next.size
-      next.prefetch_flags = (next.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
-      next = next.getNext
+    while( remaining>0 && cursor!=null ) {
+      remaining -= cursor.size
+      cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
+      cursor = cursor.getNext
     }
 
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Aug 26 15:18:53 2011
@@ -195,6 +195,14 @@ class Topic(val router:LocalRouter, val 
       rc.metrics.dequeue_ts = rc.metrics.dequeue_ts max link.enqueue_ts
     }
 
+    // Add in any queue metrics that the topic may own.
+    for(queue <- consumer_queues.values) {
+      val metrics = queue.get_queue_metrics
+      metrics.enqueue_item_counter = 0
+      metrics.enqueue_size_counter = 0
+      metrics.enqueue_ts = 0
+      DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+    }
     rc
   }
 

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java
(from r1162139, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java&r1=1162139&r2=1162140&rev=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java
Fri Aug 26 15:18:53 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
 import javax.xml.bind.annotation.*;
 
 /**
@@ -26,9 +24,9 @@ import javax.xml.bind.annotation.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="aggregate_queue_metrics")
+@XmlRootElement(name="aggregate_dest_metrics")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
+public class AggregateDestMetricsDTO extends DestMetricsDTO {
 
     /**
      * The number of objects which where aggregated.

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java
(from r1162139, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java&r1=1162139&r2=1162140&rev=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java
Fri Aug 26 15:18:53 2011
@@ -37,9 +37,82 @@ import javax.xml.bind.annotation.XmlRoot
  * </p>
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="queue_metrics")
+@XmlRootElement(name="dest_metrics")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class QueueMetricsDTO extends DestinationMetricsDTO {
+public class DestMetricsDTO {
+    /**
+     * The current time on the broker machine.  In milliseconds since the epoch.
+     */
+	@XmlAttribute(name="current_time")
+	public long current_time;
+
+    /**
+     * The number of messages that have been sent to the destination.
+     */
+    @XmlAttribute(name="enqueue_item_counter")
+    public long enqueue_item_counter;
+
+    /**
+     * The total size in bytes of messages that have been sent
+     * to the destination
+     */
+    @XmlAttribute(name="enqueue_size_counter")
+    public long enqueue_size_counter;
+
+    /**
+     * The time stamp of when the last message was sent to the destination.
+     */
+    @XmlAttribute(name="enqueue_ts")
+    public long enqueue_ts;
+
+    /**
+     * The number of messages that have been sent to consumers on
+     * the destination.
+     */
+    @XmlAttribute(name="dequeue_item_counter")
+    public long dequeue_item_counter;
+
+    /**
+     * The total size in bytes of messages that have been sent to consumers on
+     * the destination.
+     */
+    @XmlAttribute(name="dequeue_size_counter")
+    public long dequeue_size_counter;
+
+    /**
+     * The time stamp of when the last dequeue to a consumers occurred.
+     */
+    @XmlAttribute(name="dequeue_ts")
+    public long dequeue_ts;
+
+    /**
+     * The total number of producers that have ever sent to
+     * the destination.
+     */
+    @XmlAttribute(name="producer_counter")
+    public long producer_counter;
+
+    /**
+     * The total number of consumers that have ever subscribed to
+     * the queue.
+     */
+    @XmlAttribute(name="consumer_counter")
+    public long consumer_counter;
+
+
+    /**
+     * The current number of producers sending to the destination
+     * the queue.
+     */
+    @XmlAttribute(name="producer_count")
+    public long producer_count;
+
+    /**
+     * The current number of producers consuming from the destination.
+     */
+    @XmlAttribute(name="consumer_count")
+    public long consumer_count;
+
     /**
      * The number of messages which expired before they could be processed.
      */

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
Fri Aug 26 15:18:53 2011
@@ -37,7 +37,7 @@ public class QueueStatusDTO extends Serv
     public DestinationDTO binding;
 
     @XmlElement
-    public QueueMetricsDTO metrics = new QueueMetricsDTO();
+    public DestMetricsDTO metrics = new DestMetricsDTO();
 
     /**
      * Status of the entries in the queue

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
Fri Aug 26 15:18:53 2011
@@ -51,7 +51,7 @@ public class TopicStatusDTO extends Serv
     @XmlElement(name="dsub")
     public List<String> dsubs = new ArrayList<String>();
 
-    @XmlElement
-    public TopicMetricsDTO metrics = new TopicMetricsDTO();
+    @XmlElement(name="metrics")
+    public DestMetricsDTO metrics = new DestMetricsDTO();
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
Fri Aug 26 15:18:53 2011
@@ -46,7 +46,6 @@ QueueConsumerLinkDTO
 ValueDTO
 StringListDTO
 DataPageDTO
-AggregateTopicMetricsDTO
-AggregateQueueMetricsDTO
-DestinationMetricsDTO
+AggregateDestMetricsDTO
+DestMetricsDTO
 AggregateConnectionMetricsDTO
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Fri Aug 26 15:18:53 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.web.r
 import org.apache.activemq.apollo.dto._
 import java.{lang => jl}
 import org.fusesource.hawtdispatch._
-import org.apache.activemq.apollo.broker._
 import scala.collection.Iterable
 import org.apache.activemq.apollo.util.path.PathParser
 import org.apache.activemq.apollo.util._
@@ -33,6 +32,7 @@ import org.josql.{QueryResults, Query}
 import java.util.regex.Pattern
 import javax.servlet.http.HttpServletResponse
 import java.util.{Collections, ArrayList}
+import org.apache.activemq.apollo.broker._
 
 /**
  * <p>
@@ -205,8 +205,8 @@ case class BrokerResource() extends Reso
 
   @GET
   @Path("queue-metrics")
-  def get_queue_metrics(): AggregateQueueMetricsDTO = {
-    val rc:AggregateQueueMetricsDTO = with_broker { broker =>
+  def get_queue_metrics(): AggregateDestMetricsDTO = {
+    val rc:AggregateDestMetricsDTO = with_broker { broker =>
       monitoring(broker) {
         get_queue_metrics(broker)
       }
@@ -217,8 +217,8 @@ case class BrokerResource() extends Reso
 
   @GET
   @Path("topic-metrics")
-  def get_topic_metrics(): AggregateTopicMetricsDTO = {
-    val rc:AggregateTopicMetricsDTO = with_broker { broker =>
+  def get_topic_metrics(): AggregateDestMetricsDTO = {
+    val rc:AggregateDestMetricsDTO = with_broker { broker =>
       monitoring(broker) {
         get_topic_metrics(broker)
       }
@@ -229,8 +229,8 @@ case class BrokerResource() extends Reso
 
   @GET
   @Path("dsub-metrics")
-  def get_dsub_metrics(): AggregateQueueMetricsDTO = {
-    val rc:AggregateQueueMetricsDTO = with_broker { broker =>
+  def get_dsub_metrics(): AggregateDestMetricsDTO = {
+    val rc:AggregateDestMetricsDTO = with_broker { broker =>
       monitoring(broker) {
         get_dsub_metrics(broker)
       }
@@ -239,14 +239,14 @@ case class BrokerResource() extends Reso
     rc
   }
 
-  def aggregate(queue:AggregateQueueMetricsDTO, topic:AggregateTopicMetricsDTO, dsub:AggregateQueueMetricsDTO):AggregateQueueMetricsDTO
= {
+  def aggregate(queue:AggregateDestMetricsDTO, topic:AggregateDestMetricsDTO, dsub:AggregateDestMetricsDTO):AggregateDestMetricsDTO
= {
     // zero out the enqueue stats on the dsubs since they will already be accounted for in
the topic
     // stats.
     dsub.enqueue_item_counter = 0
     dsub.enqueue_size_counter = 0
     dsub.enqueue_ts = 0
-    val rc = aggregate_queue_metrics(List(queue, dsub))
-    add_destination_metrics(rc, topic)
+    val rc = aggregate_dest_metrics(List(queue, dsub))
+    DestinationMetricsSupport.add_destination_metrics(rc, topic)
     rc.objects += topic.objects
     rc.current_time = now
     rc
@@ -254,119 +254,67 @@ case class BrokerResource() extends Reso
 
   @GET
   @Path("dest-metrics")
-  def get_dest_metrics(): AggregateQueueMetricsDTO = {
+  def get_dest_metrics(): AggregateDestMetricsDTO = {
     aggregate(get_queue_metrics(), get_topic_metrics(), get_dsub_metrics())
   }
 
-  def add_destination_metrics(to:DestinationMetricsDTO, from:DestinationMetricsDTO) = {
-    to.enqueue_item_counter += from.enqueue_item_counter
-    to.enqueue_size_counter += from.enqueue_size_counter
-    to.enqueue_ts = to.enqueue_ts max from.enqueue_ts
-
-    to.dequeue_item_counter += from.dequeue_item_counter
-    to.dequeue_size_counter += from.dequeue_size_counter
-    to.dequeue_ts = to.dequeue_ts max from.dequeue_ts
-
-    to.producer_counter += from.producer_counter
-    to.consumer_counter += from.consumer_counter
-    to.producer_count += from.producer_count
-    to.consumer_count += from.consumer_count
-  }
-
-  def aggregate_queue_metrics(metrics:Iterable[QueueMetricsDTO]):AggregateQueueMetricsDTO
= {
-    metrics.foldLeft(new AggregateQueueMetricsDTO){ (memo, metric)=>
-      add_destination_metrics(memo, metric)
-
-      memo.nack_item_counter += metric.nack_item_counter
-      memo.nack_size_counter += metric.nack_size_counter
-      memo.nack_ts = memo.nack_ts max metric.nack_ts
-
-      memo.expired_item_counter += metric.expired_item_counter
-      memo.expired_size_counter += metric.expired_size_counter
-      memo.expired_ts = memo.expired_ts max metric.expired_ts
-
-      memo.queue_size += metric.queue_size
-      memo.queue_items += metric.queue_items
-
-      memo.swap_out_item_counter += metric.swap_out_item_counter
-      memo.swap_out_size_counter += metric.swap_out_size_counter
-      memo.swap_in_item_counter += metric.swap_in_item_counter
-      memo.swap_in_size_counter += metric.swap_in_size_counter
-
-      memo.swapping_in_size += metric.swapping_in_size
-      memo.swapping_out_size += metric.swapping_out_size
-
-      memo.swapped_in_items += metric.swapped_in_items
-      memo.swapped_in_size += metric.swapped_in_size
-
-      memo.swapped_in_size_max += metric.swapped_in_size_max
-
-      if( metric.isInstanceOf[AggregateQueueMetricsDTO] ) {
-        memo.objects += metric.asInstanceOf[AggregateQueueMetricsDTO].objects
-      } else {
-        memo.objects += 1
-      }
-      memo
-    }
-  }
-
-  def aggregate_topic_metrics(metrics:Iterable[TopicMetricsDTO]):AggregateTopicMetricsDTO
= {
-    metrics.foldLeft(new AggregateTopicMetricsDTO){ (memo, metric)=>
-      add_destination_metrics(memo, metric)
-      if( metric.isInstanceOf[AggregateTopicMetricsDTO] ) {
-        memo.objects += metric.asInstanceOf[AggregateTopicMetricsDTO].objects
+  def aggregate_dest_metrics(metrics:Iterable[DestMetricsDTO]):AggregateDestMetricsDTO =
{
+    metrics.foldLeft(new AggregateDestMetricsDTO){ (to, from)=>
+      DestinationMetricsSupport.add_destination_metrics(to, from)
+      if( from.isInstanceOf[AggregateDestMetricsDTO] ) {
+        to.objects += from.asInstanceOf[AggregateDestMetricsDTO].objects
       } else {
-        memo.objects += 1
+        to.objects += 1
       }
-      memo
+      to
     }
   }
 
-  def get_queue_metrics(broker:Broker):FutureResult[AggregateQueueMetricsDTO] = {
+  def get_queue_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
     val metrics = sync_all(broker.virtual_hosts.values) { host =>
       get_queue_metrics(host)
     }
-    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option)) ))
+    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_queue_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
+  def get_queue_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
     val router:LocalRouter = host
     val queues: Iterable[Queue] = router.queue_domain.destinations
     val metrics = sync_all(queues) { queue =>
       queue.get_queue_metrics
     }
-    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
+    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
   }
 
 
-  def get_topic_metrics(broker:Broker):FutureResult[AggregateTopicMetricsDTO] = {
+  def get_topic_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
     val metrics = sync_all(broker.virtual_hosts.values) { host =>
       get_topic_metrics(host)
     }
-    metrics.map( x=> Success(aggregate_topic_metrics(x.flatMap(_.success_option)) ))
+    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_topic_metrics(host:VirtualHost):FutureResult[AggregateTopicMetricsDTO] = {
+  def get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
     val router:LocalRouter = host
     val topics: Iterable[Topic] = router.topic_domain.destinations
     val metrics = topics.map(_.status.metrics)
-    FutureResult(Success(aggregate_topic_metrics(metrics)))
+    FutureResult(Success(aggregate_dest_metrics(metrics)))
   }
 
-  def get_dsub_metrics(broker:Broker):FutureResult[AggregateQueueMetricsDTO] = {
+  def get_dsub_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
     val metrics = sync_all(broker.virtual_hosts.values) { host =>
       get_dsub_metrics(host)
     }
-    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option)) ))
+    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
+  def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
     val router:LocalRouter = host
     val dsubs: Iterable[Queue] = router.topic_domain.durable_subscriptions_by_id.values
     val metrics = sync_all(dsubs) { dsub =>
       dsub.get_queue_metrics
     }
-    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
+    metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
   }
 
 
@@ -418,8 +366,8 @@ case class BrokerResource() extends Reso
   }
 
   @GET @Path("virtual-hosts/{id}/queue-metrics")
-  def virtual_host_queue_metrics(@PathParam("id") id : String): AggregateQueueMetricsDTO
= {
-    val rc:AggregateQueueMetricsDTO = with_virtual_host(id) { host =>
+  def virtual_host_queue_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO =
{
+    val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
       monitoring(host) {
         get_queue_metrics(host)
       }
@@ -429,8 +377,8 @@ case class BrokerResource() extends Reso
   }
 
   @GET @Path("virtual-hosts/{id}/topic-metrics")
-  def virtual_host_topic_metrics(@PathParam("id") id : String): AggregateTopicMetricsDTO
= {
-    val rc:AggregateTopicMetricsDTO = with_virtual_host(id) { host =>
+  def virtual_host_topic_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO =
{
+    val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
       monitoring(host) {
         get_topic_metrics(host)
       }
@@ -440,8 +388,8 @@ case class BrokerResource() extends Reso
   }
 
   @GET @Path("virtual-hosts/{id}/dsub-metrics")
-  def virtual_host_dsub_metrics(@PathParam("id") id : String): AggregateQueueMetricsDTO =
{
-    val rc:AggregateQueueMetricsDTO = with_virtual_host(id) { host =>
+  def virtual_host_dsub_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO =
{
+    val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
       monitoring(host) {
         get_dsub_metrics(host)
       }
@@ -451,7 +399,7 @@ case class BrokerResource() extends Reso
   }
 
   @GET @Path("virtual-hosts/{id}/dest-metrics")
-  def virtual_host_dest_metrics(@PathParam("id") id : String): AggregateQueueMetricsDTO =
{
+  def virtual_host_dest_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO =
{
     aggregate(virtual_host_queue_metrics(id), virtual_host_topic_metrics(id), virtual_host_dsub_metrics(id))
   }
 



Mime
View raw message