activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1347848 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ ap...
Date Fri, 08 Jun 2012 00:47:37 GMT
Author: chirino
Date: Fri Jun  8 00:47:35 2012
New Revision: 1347848

URL: http://svn.apache.org/viewvc?rev=1347848&view=rev
Log:
Making more progress on implementing network bridging.  Test case is now passing a message
between Apollo brokers via an automatically deployed bridge due to consumer demand.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java
      - copied, changed from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala
      - copied, changed from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
    activemq/activemq-apollo/trunk/apollo-network/pom.xml
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
    activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
    activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Jun  8 00:47:35 2012
@@ -65,6 +65,8 @@ trait DeliveryConsumer extends Retained 
   def start_from_tail = false
   def set_starting_seq(seq:Long) = {}
 
+  def user:String = null
+  def jms_selector:String = null
   def browser = false
   def exclusive = false
   def dispatch_queue:DispatchQueue;

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Jun  8 00:47:35 2012
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit
 
 import org.fusesource.hawtdispatch._
 import protocol.ProtocolFactory
-import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
@@ -33,6 +32,8 @@ import security.{SecuredResource, Securi
 import org.apache.activemq.apollo.dto._
 import org.fusesource.hawtbuf._
 import java.util.regex.Pattern
+import java.util.ArrayList
+import collection.mutable.{ArrayBuffer, ListBuffer}
 
 sealed trait FullDropPolicy
 object Block extends FullDropPolicy
@@ -317,10 +318,19 @@ class Queue(val router: LocalRouter, val
     rc.message_size_enqueue_counter = enqueue_size_counter
     rc.message_count_dequeue_counter = dequeue_item_counter
     rc.message_size_dequeue_counter = dequeue_size_counter
-//  TODO: expose selector attribute of consumer.
-//    for( consumer <- all_subscriptions.keys ) {
-//      rc.consumer_selectors.add(consumer.selector)
-//    }
+
+    for( sub <- all_subscriptions.values ) {
+      val dto = new ConsumerLoadDTO
+      dto.user = sub.consumer.user
+      dto.selector = sub.consumer.jms_selector
+      sub.ack_rates match {
+        case Some((items_per_sec, size_per_sec) ) =>
+          dto.ack_item_rate = items_per_sec
+          dto.ack_size_rate = size_per_sec
+        case _ =>
+      }
+      rc.consumers.add(dto)
+    }
     rc
   }
   def status(entries:Boolean=false) = {
@@ -391,7 +401,14 @@ class Queue(val router: LocalRouter, val
       link.total_nack_count = sub.total_nack_count
       link.acquired_size = sub.acquired_size
       link.acquired_count = sub.acquired_count
-      link.waiting_on = if( sub.full ) {
+      sub.ack_rates match {
+        case Some((items_per_sec, size_per_sec) ) =>
+          link.ack_item_rate = items_per_sec
+          link.ack_size_rate = size_per_sec
+        case _ =>
+      }
+
+      if( sub.full ) {
         "consumer"
       } else if( sub.pos.is_tail ) {
         "producer"
@@ -472,7 +489,10 @@ class Queue(val router: LocalRouter, val
 
       // by the time this is run, consumers and producers may have already joined.
       on_completed.run
-      schedule_periodic_maintenance
+      schedule_reoccurring(1, TimeUnit.SECONDS) {
+        queue_maintenance
+      }
+
       // wake up the producers to fill us up...
       if (messages.refiller != null) {
         messages.refiller.run
@@ -895,56 +915,53 @@ class Queue(val router: LocalRouter, val
   var delivery_rate = 0L
   def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
 
-  def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
-    if( service_state.is_started ) {
-      var elapsed = System.currentTimeMillis-now
-      now += elapsed
+  def queue_maintenance:Unit = {
+    var elapsed = System.currentTimeMillis-now
+    now += elapsed
 
-      consumers_keeping_up_historically = consumers_keeping_up_counter!=0
-      consumers_keeping_up_counter = 0
-      
-      delivery_rate = 0L
+    consumers_keeping_up_historically = consumers_keeping_up_counter!=0
+    consumers_keeping_up_counter = 0
 
-      var consumer_stall_ms = 0L
-      var load_stall_ms = 0L
+    delivery_rate = 0L
 
-      all_subscriptions.values.foreach{ sub=>
-        val (cs, ls) = sub.adjust_prefetch_size
-        consumer_stall_ms += cs
-        load_stall_ms += ls
-        if(!sub.browser) {
-          delivery_rate += sub.enqueue_size_per_interval
-        }
+    var consumer_stall_ms = 0L
+    var load_stall_ms = 0L
+
+    all_subscriptions.values.foreach{ sub=>
+      val (cs, ls) = sub.adjust_prefetch_size
+      consumer_stall_ms += cs
+      load_stall_ms += ls
+      if(!sub.browser) {
+        delivery_rate += sub.enqueue_size_per_interval
       }
-      
-      val rate_adjustment = elapsed.toFloat / 1000.toFloat
-      delivery_rate  = (delivery_rate / rate_adjustment).toLong
+    }
 
-      val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
+    val rate_adjustment = elapsed.toFloat / 1000.toFloat
+    delivery_rate  = (delivery_rate / rate_adjustment).toLong
 
-      // Figure out what the max enqueue rate should be.
-      max_enqueue_rate = Int.MaxValue
-      if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 &&
delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio
< 1.0 ) {
-        max_enqueue_rate = tune_catchup_enqueue_rate
-      }
-      if(tune_max_enqueue_rate >=0 ) {
-        max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
-      }
-      if( max_enqueue_rate < Int.MaxValue ) {
-        if(enqueues_remaining==null) {
-          enqueues_remaining = new LongCounter()
-          enqueue_throttle_release(enqueues_remaining)
-        }
-      } else {
-        if(enqueues_remaining!=null) {
-          enqueues_remaining = null
-        }
-      }
+    val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
 
-      swap_messages
-      check_idle
-      schedule_periodic_maintenance
+    // Figure out what the max enqueue rate should be.
+    max_enqueue_rate = Int.MaxValue
+    if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 &&
delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio
< 1.0 ) {
+      max_enqueue_rate = tune_catchup_enqueue_rate
     }
+    if(tune_max_enqueue_rate >=0 ) {
+      max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
+    }
+    if( max_enqueue_rate < Int.MaxValue ) {
+      if(enqueues_remaining==null) {
+        enqueues_remaining = new LongCounter()
+        enqueue_throttle_release(enqueues_remaining)
+      }
+    } else {
+      if(enqueues_remaining!=null) {
+        enqueues_remaining = null
+      }
+    }
+
+    swap_messages
+    check_idle
   }
     
   var max_enqueue_rate = Int.MaxValue
@@ -1029,6 +1046,7 @@ class Queue(val router: LocalRouter, val
   
   
   def drain_acks = might_unfill {
+    val end = System.nanoTime()
     ack_source.getData.foreach {
       case (entry, consumed, uow) =>
         consumed match {
@@ -2194,8 +2212,29 @@ class Subscription(val queue:Queue, val 
   var consumer_stall_start = 0L
   var load_stall_start = 0L
 
+  var started_at = Broker.now
   var total_ack_count = 0L
+  var total_ack_size = 0L
   var total_nack_count = 0L
+
+  var idle_start = System.nanoTime()
+  var idle_total = 0L
+  
+  def ack_rates = {
+    var duration = ((Broker.now - started_at)*1000000)
+    duration -= idle_total
+    if( idle_start!=0 ) {
+      duration -= System.nanoTime() - idle_start
+    }
+
+    if( duration != 0 && total_ack_count > 0 ) {
+      val ack_rate = 1000000000d * total_ack_count / duration
+      val ack_size_rate = 1000000000d * total_ack_size / duration
+      Some((ack_rate, ack_size_rate))
+    } else {
+      None
+    }
+  }
   
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
@@ -2356,7 +2395,6 @@ class Subscription(val queue:Queue, val 
   }
 
   def adjust_prefetch_size = {
-
     enqueue_size_per_interval = session.enqueue_size_counter - enqueue_size_at_last_interval
     enqueue_size_at_last_interval = session.enqueue_size_counter
 
@@ -2410,6 +2448,11 @@ class Subscription(val queue:Queue, val 
 
   class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
 
+    if(acquired.isEmpty) {
+      idle_total = System.nanoTime() - idle_start
+      idle_start = 0
+    }
+    
     acquired.addLast(this)
     acquired_size += entry.size
 
@@ -2421,6 +2464,7 @@ class Subscription(val queue:Queue, val 
       }
 
       total_ack_count += 1
+      total_ack_size += entry.size
       if (entry.messageKey != -1) {
         val storeBatch = if( uow == null ) {
           queue.virtual_host.store.create_uow
@@ -2438,6 +2482,9 @@ class Subscription(val queue:Queue, val 
 
       // removes this entry from the acquired list.
       unlink()
+      if( acquired.isEmpty ) {
+        idle_start = System.nanoTime()
+      }
 
       // we may now be able to prefetch some messages..
       acquired_size -= entry.size
@@ -2492,6 +2539,9 @@ class Subscription(val queue:Queue, val 
 
       }
       unlink()
+      if( acquired.isEmpty ) {
+        idle_start = System.nanoTime()
+      }
       check_finish_close
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
Fri Jun  8 00:47:35 2012
@@ -34,6 +34,7 @@ import java.util.jar.JarInputStream
 import java.lang.String
 import org.eclipse.jetty.servlet.{FilterMapping, FilterHolder}
 import org.apache.activemq.apollo.broker.web.{AllowAnyOriginFilter, WebServer, WebServerFactory}
+import javax.servlet._
 
 /**
  * <p>
@@ -226,6 +227,14 @@ class JettyWebServer(val broker:Broker) 
               val origins = cors_origin.split(",").map(_.trim()).toSet
               context.addFilter(new FilterHolder(new AllowAnyOriginFilter(origins)), "/*",
FilterMapping.DEFAULT)
             }
+            context.addFilter(new FilterHolder(new Filter(){
+              def init(p1: FilterConfig) {}
+              def destroy() {}
+              def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain)
= {
+                request.setAttribute("APOLLO_BROKER", broker)
+                chain.doFilter(request, response)
+              }
+            }), "/*", FilterMapping.DEFAULT)
 
             if( broker.tmp !=null ) {
               context.setTempDirectory(broker.tmp)

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java
(from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&r1=1347662&r2=1347848&rev=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java
Fri Jun  8 00:47:35 2012
@@ -14,31 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.apollo.dto;
 
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashSet;
 
 /**
+ * The
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="queue_consumer_link")
+@XmlRootElement(name="destination_load")
 @XmlAccessorType(XmlAccessType.FIELD)
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueConsumerLinkDTO extends LinkDTO {
+public class ConsumerLoadDTO {
+
+    @XmlAttribute(name="user")
+    public String user;
+
+    @XmlAttribute(name="selector")
+    public String selector;
 
-    public long position = 0;
+    @XmlAttribute(name="ack_item_rate")
+    public Double ack_item_rate;
 
-    public int acquired_count;
-    public long acquired_size;
+    @XmlAttribute(name="ack_size_rate")
+    public Double ack_size_rate;
 
-    public long total_ack_count;
-    public long total_nack_count;
-
-    /**
-     * What the consumer is currently waiting on
-     */
-    @XmlAttribute(name="waiting_on")
-	public String waiting_on;
-}
\ No newline at end of file
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
Fri Jun  8 00:47:35 2012
@@ -55,6 +55,6 @@ public class DestinationLoadDTO {
     @XmlAttribute(name="message_size_dequeue_counter")
     public Long message_size_dequeue_counter;
 
-    public HashSet<String> consumer_selectors = new HashSet<String>();
+    public ArrayList<ConsumerLoadDTO> consumers = new ArrayList<ConsumerLoadDTO>();
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
Fri Jun  8 00:47:35 2012
@@ -36,6 +36,9 @@ public class QueueConsumerLinkDTO extend
     public long total_ack_count;
     public long total_nack_count;
 
+    public Double ack_item_rate;
+    public Double ack_size_rate;
+
     /**
      * What the consumer is currently waiting on
      */

Modified: activemq/activemq-apollo/trunk/apollo-network/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/pom.xml?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-network/pom.xml Fri Jun  8 00:47:35 2012
@@ -151,6 +151,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <version>1.1</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
Fri Jun  8 00:47:35 2012
@@ -48,12 +48,12 @@ class RestLoadMonitor extends BaseServic
   var poll_interval = 5*1000;
 
   protected def _start(on_completed: Task) = {
-    on_completed.run()
     schedule_reoccurring(1, SECONDS) {
       for(monitor <- members.values) {
         monitor.poll
       }
     }
+    on_completed.run()
   }
 
   protected def _stop(on_completed: Task) = {
@@ -106,10 +106,12 @@ class RestLoadMonitor extends BaseServic
     }
   }
 
-  def add(member: ClusterMemberDTO) = dispatch_queue {
-    for(service <- member.services) {
-      if( service.kind == "webadmin" ) {
-        members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+  def add(member: ClusterMemberDTO) = {
+    dispatch_queue {
+      for(service <- member.services) {
+        if( service.kind == "web_admin" ) {
+          members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+        }
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
Fri Jun  8 00:47:35 2012
@@ -39,10 +39,10 @@ class CounterDrivenRate {
 }
 
 class DestinationMetrics {
-  var consumer_count = 0L
   var message_size = 0L
   val enqueue_size_rate = new CounterDrivenRate()
-  val dequeue_size_rate = new CounterDrivenRate()
+  var consumer_count = 0L
+  var dequeue_size_rate = 0d
 }
 
 class BrokerMetrics() {
@@ -52,7 +52,7 @@ class BrokerMetrics() {
 //  var topic_load = HashMap[String, DestinationMetrics]()
   var timestamp = System.currentTimeMillis()
 
-  def update(current:LoadStatusDTO) = {
+  def update(current:LoadStatusDTO, network_user:String) = {
     val now = System.currentTimeMillis()
     val duration = (now - timestamp)/1000.0d
     timestamp = now
@@ -61,8 +61,18 @@ class BrokerMetrics() {
     for( dest <- current.queues ) {
       val dest_load = queue_load.get(dest.id).getOrElse(new DestinationMetrics())
       dest_load.message_size = dest.message_size
+
+      // Lets not include the network consumers in the the consumer rates..
+      val consumers = dest.consumers.filter(_.user == network_user).toArray
+
+      dest_load.consumer_count = consumers.size
+      dest_load.dequeue_size_rate = 0
+      for( c <- consumers ) {
+        if( c.ack_size_rate !=null ) {
+          dest_load.dequeue_size_rate +=  c.ack_size_rate
+        }
+      }
       dest_load.enqueue_size_rate.update(dest.message_size_enqueue_counter, duration)
-      dest_load.dequeue_size_rate.update(dest.message_size_dequeue_counter, duration)
       next_queue_load += dest.id -> dest_load
     }
     queue_load = next_queue_load

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
Fri Jun  8 00:47:35 2012
@@ -93,7 +93,7 @@ class NetworkManager(broker: Broker) ext
   }
 
   def on_load_change(dto: LoadStatusDTO) = dispatch_queue {
-    metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto)
+    metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, config.user)
   }
 
   def load_analysis = {
@@ -113,15 +113,16 @@ class NetworkManager(broker: Broker) ext
     val queue_demand_map = HashMap[String, DemandStatus]()
 
     for( (broker, broker_load) <- metrics_map) {
-      for( (id, dest) <- broker_load.queue_load ) {
-        val status = queue_demand_map.getOrElseUpdate(id, new DemandStatus)
-        if( can_bridge_from(broker) &&  needs_more_consumers(dest) ) {
+      for( (dest_name, dest) <- broker_load.queue_load ) {
+        val status = queue_demand_map.getOrElseUpdate(dest_name, new DemandStatus)
+        var needsmoreconsumers = needs_more_consumers(dest)
+        if( can_bridge_from(broker) && needsmoreconsumers ) {
           // The broker needs more consumers to drain the queue..
-          status.needs_consumers += (id->dest)
+          status.needs_consumers += (broker->dest)
         } else {
           // The broker can drain the queue of other brokers..
           if( can_bridge_to(broker) && dest.consumer_count > 0 ) {
-            status.has_consumers += (id->dest)
+            status.has_consumers += (broker->dest)
           }
         }
       }
@@ -129,7 +130,7 @@ class NetworkManager(broker: Broker) ext
 
     val desired_bridges = HashSet[BridgeInfo]()
     for( (id, demand) <- queue_demand_map ) {
-      for( (to, to_metrics)<- demand.needs_consumers; (from, from_metrics) <-demand.has_consumers
) {
+      for( (from, from_metrics)<- demand.needs_consumers; (to, to_metrics) <-demand.has_consumers
) {
         // we could get fancy and compare the to_metrics and from_metrics to avoid
         // setting up bridges that won't make a big difference..
         desired_bridges += BridgeInfo(from, to, "queue", id)
@@ -152,13 +153,12 @@ class NetworkManager(broker: Broker) ext
 
   }
 
-  var local_broker_id = ""
-  var enable_duplex = false
+  def local_broker_id = config.self
 
   def can_bridge_from(broker:String):Boolean = broker==local_broker_id
   def can_bridge_to(broker:String):Boolean = {
     if ( broker == local_broker_id) {
-      enable_duplex
+      OptionSupport(config.duplex).getOrElse(false)
     } else {
       true
     }
@@ -171,7 +171,7 @@ class NetworkManager(broker: Broker) ext
       return false
     }
 
-    val drain_rate = dest.dequeue_size_rate.mean - dest.enqueue_size_rate.mean
+    val drain_rate = dest.dequeue_size_rate - dest.enqueue_size_rate.mean
     if( drain_rate < 0 ) {
       // Not draining...
       return true
@@ -204,7 +204,7 @@ class NetworkManager(broker: Broker) ext
 
           // Lets look to see if we can use the strategy with services exposed by the broker..
           for( to_service <- to.services; from_service <- from.services ) {
-            if( to_service.kind==service_kind && to_service.kind==from_service.kind
) {
+            if( bridging_strategy==null && to_service.kind==service_kind &&
to_service.kind==from_service.kind ) {
               bridging_strategy = strategy
               bridging_strategy_info = BridgeInfo(from_service.address, to_service.address,
info.kind, info.dest)
               bridging_strategy.deploy( bridging_strategy_info )

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
Fri Jun  8 00:47:35 2012
@@ -21,12 +21,12 @@ import org.fusesource.hawtdispatch._
 import java.net.URI
 import org.apache.activemq.apollo.util.{StateMachine, Log}
 import org.fusesource.hawtbuf.AsciiBuffer
-import org.fusesource.stomp.codec.StompFrame
 import org.apache.activemq.apollo.broker.Broker
 import java.util.Properties
 import org.fusesource.stomp.client.{CallbackConnection, Stomp}
 import java.util.concurrent.TimeUnit
 import collection.mutable.HashMap
+import org.fusesource.stomp.codec.StompFrame
 
 
 object StompBridgingStrategy extends Log {
@@ -40,6 +40,9 @@ class StompBridgingStrategy(val manager:
 
   val bridges = HashMap[(String, String), Bridge]()
 
+  def bridge_user = manager.config.user
+  def bridge_password = manager.config.password
+
   def deploy(info:BridgeInfo) = {
     dispatch_queue.assertExecuting()
     val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to))
@@ -60,8 +63,28 @@ class StompBridgingStrategy(val manager:
     val from_connection = ConnectionStateMachine(new URI(from))
     val to_connection = ConnectionStateMachine(new URI(to))
 
-    from_connection.refiller = ^{
-
+    from_connection.receive_handler = frame => {
+      val original_state = from_connection.state
+      frame.action() match {
+        case MESSAGE =>
+          // forward it..
+          frame.action(SEND)
+          println("forwarding message: "+frame.getHeader(MESSAGE_ID))
+          to_connection.send(frame, ()=>{
+            // Ack it if the original connection is still up...
+            // TODO: if it's not a we will probably get a dup/redelivery.
+            // Might want to introduce some dup detection at this point.
+            if( from_connection.state eq original_state ) {
+              val ack = new StompFrame(ACK);
+              ack.addHeader(SUBSCRIPTION, frame.getHeader(SUBSCRIPTION))
+              ack.addHeader(MESSAGE_ID, frame.getHeader(MESSAGE_ID))
+              from_connection.send(ack, null)
+              println("forwarded message, now acking: "+frame.getHeader(MESSAGE_ID))
+            }
+          })
+        case _ =>
+          println("unhandled stomp frame: "+frame)
+      }
     }
 
     dispatch_queue {
@@ -75,10 +98,8 @@ class StompBridgingStrategy(val manager:
       var subscriptions = HashMap[AsciiBuffer, AsciiBuffer]()
       var pending_sends = HashMap[Long, (StompFrame, ()=>Unit)]()
 
-      var refiller: Runnable = ^{ sys.error("refiller not set") }
-      var receive_handler: (StompFrame)=>Boolean = frame => {
+      var receive_handler: (StompFrame)=>Unit = frame => {
         info("dropping frame: %s", frame)
-        true
       }
 
 
@@ -107,8 +128,8 @@ class StompBridgingStrategy(val manager:
           val to_stomp = new Stomp()
           to_stomp.setDispatchQueue(dispatch_queue)
           to_stomp.setRemoteURI(uri)
-          to_stomp.setLogin("admin")
-          to_stomp.setPasscode("password")
+          to_stomp.setLogin(bridge_user)
+          to_stomp.setPasscode(bridge_password)
           to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL)
           val headers = new Properties()
           headers.put("client-type", "apollo-bridge")
@@ -143,15 +164,13 @@ class StompBridgingStrategy(val manager:
           debug("Bridge connected to: %s", uri)
           connection.receive(new org.fusesource.stomp.client.Callback[StompFrame] {
             override def onSuccess(value: StompFrame) = {
-              if( !receive_handler(value) ) {
-                connection.suspend()
-              }
+              receive_handler(value)
             }
             override def onFailure(value: Throwable) = {
               failed(value)
             }
           })
-          connection.refiller(refiller)
+          connection.resume()
 
           // Reconnect any subscriptions.
           subscriptions.keySet.foreach(subscribe(_))

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
Fri Jun  8 00:47:35 2012
@@ -31,7 +31,7 @@ public class ClusterMemberDTO {
     @XmlAttribute(name="id")
     public String id;
 
-    @XmlElementRef(name="service")
+    @XmlElement(name="service")
     public ArrayList<ClusterServiceDTO> services = new ArrayList<ClusterServiceDTO>();
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
Fri Jun  8 00:47:35 2012
@@ -31,8 +31,17 @@ import java.util.ArrayList;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class NetworkManagerDTO extends CustomServiceDTO {
 
-//    @XmlElementRef(name="bridge")
-//    public ArrayList<BridgeDTO> bridges = new ArrayList<BridgeDTO>();
+    @XmlAttribute(name="user")
+    public String user;
+
+    @XmlAttribute(name="password")
+    public String password;
+
+    @XmlAttribute(name="self")
+    public String self;
+
+    @XmlAttribute(name="duplex")
+    public Boolean duplex;
 
     @XmlElement(name="member")
     public ArrayList<ClusterMemberDTO> members = new ArrayList<ClusterMemberDTO>();

Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
Fri Jun  8 00:47:35 2012
@@ -26,13 +26,13 @@
   <web_admin bind="http://0.0.0.0:41000"/>
   <connector id="tcp" bind="tcp://0.0.0.0:41001"/>
 
-  <network_manager>
+  <network_manager self="broker1">
     <member id="broker1">
-      <service kind="web_admin" address="http://0.0.0.0:41000"/>
+      <service kind="web_admin" address="http://0.0.0.0:41000/api/json/broker/virtual-hosts/broker1/load-status"/>
       <service kind="stomp" address="tcp://0.0.0.0:41001"/>
     </member>
     <member id="broker2">
-      <service kind="web_admin" address="http://0.0.0.0:42000"/>
+      <service kind="web_admin" address="http://0.0.0.0:42000/api/json/broker/virtual-hosts/broker2/load-status"/>
       <service kind="stomp" address="tcp://0.0.0.0:42001"/>
     </member>
   </network_manager>

Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
Fri Jun  8 00:47:35 2012
@@ -26,13 +26,13 @@
   <web_admin bind="http://0.0.0.0:42000"/>
   <connector id="tcp" bind="tcp://0.0.0.0:42001"/>
 
-  <network_manager>
+  <network_manager self="broker2">
     <member id="broker1">
-      <service kind="web_admin" address="http://0.0.0.0:41000"/>
+      <service kind="web_admin" address="http://0.0.0.0:41000/api/json/broker/virtual-hosts/broker1/load-status"/>
       <service kind="stomp" address="tcp://0.0.0.0:41001"/>
     </member>
     <member id="broker2">
-      <service kind="web_admin" address="http://0.0.0.0:42000"/>
+      <service kind="web_admin" address="http://0.0.0.0:42000/api/json/broker/virtual-hosts/broker2/load-status"/>
       <service kind="stomp" address="tcp://0.0.0.0:42001"/>
     </member>
   </network_manager>

Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
Fri Jun  8 00:47:35 2012
@@ -20,7 +20,11 @@ package org.apache.activemq.apollo.broke
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.BeforeAndAfterEach
 import org.apache.activemq.apollo.broker.MultiBrokerTestSupport
-
+import javax.jms.Session._
+import org.fusesource.stomp.jms.{StompJmsDestination, StompJmsConnectionFactory}
+import collection.mutable.ListBuffer
+import javax.jms.{Message, TextMessage, Connection, ConnectionFactory}
+import java.util.concurrent.TimeUnit._
 
 class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach
{
 
@@ -29,10 +33,55 @@ class NetworkTest extends MultiBrokerTes
     "xml:classpath:apollo-network-2.xml"
   )
 
-  test("basics") {
-    admins(0).broker should not be(null)
-    val config = admins(0).broker.config
-    admins(1).broker should not be(null)
+  val connections = ListBuffer[Connection]()
+
+  override protected def afterEach() = {
+    for( c <- connections ) {
+      try {
+        c.close()
+      } catch {
+        case ignore =>
+      }
+    }
+    connections.clear()
+  }
+
+  def create_connection(factory:ConnectionFactory) = {
+    val rc = factory.createConnection()
+    rc.start()
+    connections += rc
+    rc
+  }
+
+  def connection_factories = admins.map { admin =>
+    val rc = new StompJmsConnectionFactory();
+    rc.setBrokerURI("tcp://localhost:"+admin.port);
+    rc
+  }
+
+  def create_connections = connection_factories.map(create_connection(_))
+
+  def test_destination(kind:String="queue", name:String=testName) =
+    new StompJmsDestination("/"+kind+"/"+name.replaceAll("[^a-zA-Z0-9._-]", "_"))
+
+  def text(msg:Message) = msg match {
+    case msg:TextMessage => Some(msg.getText)
+    case _ => None
   }
 
+  test("forward one message") {
+    val connections = create_connections
+    
+    val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE)
+    val p0 = s0.createProducer(test_destination())
+    p0.send(s0.createTextMessage("1"))
+
+    val s1 = connections(1).createSession(false, AUTO_ACKNOWLEDGE)
+    val c1 = s1.createConsumer(test_destination())
+    within(30, SECONDS) {
+      text(c1.receive()) should be(Some("1"))
+    }
+  }
+
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Jun  8 00:47:35 2012
@@ -178,6 +178,10 @@ class StompProtocolHandler extends Proto
 
     override def start_from_tail = from_seq == -1
 
+    override def jms_selector = if(selector!=null){ selector._1 } else { null }
+
+    override def user = security_context.user
+
     var starting_seq:Long = 0L
     override def set_starting_seq(seq: Long):Unit = {
       starting_seq=seq

Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala
(from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&r1=1347662&r2=1347848&rev=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala
Fri Jun  8 00:47:35 2012
@@ -14,31 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.dto;
+package org.apache.activemq.apollo.util
 
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import javax.xml.bind.annotation.*;
+import collection.mutable.ArrayBuffer
 
 /**
+ * <p>A circular buffer</p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="queue_consumer_link")
-@XmlAccessorType(XmlAccessType.FIELD)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueConsumerLinkDTO extends LinkDTO {
-
-    public long position = 0;
-
-    public int acquired_count;
-    public long acquired_size;
-
-    public long total_ack_count;
-    public long total_nack_count;
-
-    /**
-     * What the consumer is currently waiting on
-     */
-    @XmlAttribute(name="waiting_on")
-	public String waiting_on;
+class CircularBuffer[T](max:Int) extends ArrayBuffer[T](max) {
+  
+  def max_size = max
+  private var pos = 0
+  
+  override def +=(elem: T): this.type = {
+    if( size < initialSize ) {
+      super.+=(elem)
+    } else {
+      evicted(this(pos))
+      this.update(pos, elem)
+      pos += 1
+      if( pos >= initialSize ) {
+        pos = 0
+      }
+    }
+    this
+  }
+
+  /**
+   * Sub classes can override this method to so they can be
+   * notified when an element is being evicted from the circular
+   * buffer.
+   */
+  protected def evicted(elem:T) = {}
+  
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Fri Jun  8 00:47:35 2012
@@ -334,17 +334,21 @@ abstract class Resource(parent:Resource=
     host.router.asInstanceOf[LocalRouter]
   }
 
-  def now = BrokerRegistry.list.headOption.map(_.now).getOrElse(System.currentTimeMillis())
+  def now = System.currentTimeMillis()
 
   protected def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker)=>FutureResult[T]):FutureResult[T]
= {
-    BrokerRegistry.list.headOption match {
-      case Some(broker)=>
-        sync(broker) {
-          func(broker)
-        }
-      case None=>
-        result(NOT_FOUND)
-    }
+    val broker = http_request.getAttribute("APOLLO_BROKER").asInstanceOf[Broker]
+    sync(broker) {
+      func(broker)
+    }
+//    BrokerRegistry.list.headOption match {
+//      case Some(broker)=>
+//        sync(broker) {
+//          func(broker)
+//        }
+//      case None=>
+//        result(NOT_FOUND)
+//    }
   }
 
   protected def with_connector[T](id:String)(func: (org.apache.activemq.apollo.broker.Connector)=>FutureResult[T]):FutureResult[T]
= {

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Fri Jun  8 00:47:35 2012
@@ -114,6 +114,10 @@ ul
       p acks: #{x.total_ack_count} messages
       p naks: #{x.total_nack_count} messages
       p waiting on: #{x.waiting_on}
+      p acks per second: #{x.ack_item_rate}
+      p bytes acked per second: #{x.ack_size_rate}
+
+
 
 - if ( entries.isEmpty )
   h2



Mime
View raw message