Author: chirino
Date: Fri Jun 8 00:47:35 2012
New Revision: 1347848
URL: http://svn.apache.org/viewvc?rev=1347848&view=rev
Log:
Making more progress on implementing network bridging. Test case is now passing a message
between Apollo brokers via an automatically deployed bridge due to consumer demand.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java
- copied, changed from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala
- copied, changed from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
activemq/activemq-apollo/trunk/apollo-network/pom.xml
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Jun 8 00:47:35 2012
@@ -65,6 +65,8 @@ trait DeliveryConsumer extends Retained
def start_from_tail = false
def set_starting_seq(seq:Long) = {}
+ def user:String = null
+ def jms_selector:String = null
def browser = false
def exclusive = false
def dispatch_queue:DispatchQueue;
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Jun 8 00:47:35 2012
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit
import org.fusesource.hawtdispatch._
import protocol.ProtocolFactory
-import collection.mutable.ListBuffer
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
@@ -33,6 +32,8 @@ import security.{SecuredResource, Securi
import org.apache.activemq.apollo.dto._
import org.fusesource.hawtbuf._
import java.util.regex.Pattern
+import java.util.ArrayList
+import collection.mutable.{ArrayBuffer, ListBuffer}
sealed trait FullDropPolicy
object Block extends FullDropPolicy
@@ -317,10 +318,19 @@ class Queue(val router: LocalRouter, val
rc.message_size_enqueue_counter = enqueue_size_counter
rc.message_count_dequeue_counter = dequeue_item_counter
rc.message_size_dequeue_counter = dequeue_size_counter
-// TODO: expose selector attribute of consumer.
-// for( consumer <- all_subscriptions.keys ) {
-// rc.consumer_selectors.add(consumer.selector)
-// }
+
+ for( sub <- all_subscriptions.values ) {
+ val dto = new ConsumerLoadDTO
+ dto.user = sub.consumer.user
+ dto.selector = sub.consumer.jms_selector
+ sub.ack_rates match {
+ case Some((items_per_sec, size_per_sec) ) =>
+ dto.ack_item_rate = items_per_sec
+ dto.ack_size_rate = size_per_sec
+ case _ =>
+ }
+ rc.consumers.add(dto)
+ }
rc
}
def status(entries:Boolean=false) = {
@@ -391,7 +401,14 @@ class Queue(val router: LocalRouter, val
link.total_nack_count = sub.total_nack_count
link.acquired_size = sub.acquired_size
link.acquired_count = sub.acquired_count
- link.waiting_on = if( sub.full ) {
+ sub.ack_rates match {
+ case Some((items_per_sec, size_per_sec) ) =>
+ link.ack_item_rate = items_per_sec
+ link.ack_size_rate = size_per_sec
+ case _ =>
+ }
+
+ if( sub.full ) {
"consumer"
} else if( sub.pos.is_tail ) {
"producer"
@@ -472,7 +489,10 @@ class Queue(val router: LocalRouter, val
// by the time this is run, consumers and producers may have already joined.
on_completed.run
- schedule_periodic_maintenance
+ schedule_reoccurring(1, TimeUnit.SECONDS) {
+ queue_maintenance
+ }
+
// wake up the producers to fill us up...
if (messages.refiller != null) {
messages.refiller.run
@@ -895,56 +915,53 @@ class Queue(val router: LocalRouter, val
var delivery_rate = 0L
def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
- def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
- if( service_state.is_started ) {
- var elapsed = System.currentTimeMillis-now
- now += elapsed
+ def queue_maintenance:Unit = {
+ var elapsed = System.currentTimeMillis-now
+ now += elapsed
- consumers_keeping_up_historically = consumers_keeping_up_counter!=0
- consumers_keeping_up_counter = 0
-
- delivery_rate = 0L
+ consumers_keeping_up_historically = consumers_keeping_up_counter!=0
+ consumers_keeping_up_counter = 0
- var consumer_stall_ms = 0L
- var load_stall_ms = 0L
+ delivery_rate = 0L
- all_subscriptions.values.foreach{ sub=>
- val (cs, ls) = sub.adjust_prefetch_size
- consumer_stall_ms += cs
- load_stall_ms += ls
- if(!sub.browser) {
- delivery_rate += sub.enqueue_size_per_interval
- }
+ var consumer_stall_ms = 0L
+ var load_stall_ms = 0L
+
+ all_subscriptions.values.foreach{ sub=>
+ val (cs, ls) = sub.adjust_prefetch_size
+ consumer_stall_ms += cs
+ load_stall_ms += ls
+ if(!sub.browser) {
+ delivery_rate += sub.enqueue_size_per_interval
}
-
- val rate_adjustment = elapsed.toFloat / 1000.toFloat
- delivery_rate = (delivery_rate / rate_adjustment).toLong
+ }
- val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
+ val rate_adjustment = elapsed.toFloat / 1000.toFloat
+ delivery_rate = (delivery_rate / rate_adjustment).toLong
- // Figure out what the max enqueue rate should be.
- max_enqueue_rate = Int.MaxValue
- if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 &&
delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio
< 1.0 ) {
- max_enqueue_rate = tune_catchup_enqueue_rate
- }
- if(tune_max_enqueue_rate >=0 ) {
- max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
- }
- if( max_enqueue_rate < Int.MaxValue ) {
- if(enqueues_remaining==null) {
- enqueues_remaining = new LongCounter()
- enqueue_throttle_release(enqueues_remaining)
- }
- } else {
- if(enqueues_remaining!=null) {
- enqueues_remaining = null
- }
- }
+ val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
- swap_messages
- check_idle
- schedule_periodic_maintenance
+ // Figure out what the max enqueue rate should be.
+ max_enqueue_rate = Int.MaxValue
+ if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 &&
delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio
< 1.0 ) {
+ max_enqueue_rate = tune_catchup_enqueue_rate
}
+ if(tune_max_enqueue_rate >=0 ) {
+ max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
+ }
+ if( max_enqueue_rate < Int.MaxValue ) {
+ if(enqueues_remaining==null) {
+ enqueues_remaining = new LongCounter()
+ enqueue_throttle_release(enqueues_remaining)
+ }
+ } else {
+ if(enqueues_remaining!=null) {
+ enqueues_remaining = null
+ }
+ }
+
+ swap_messages
+ check_idle
}
var max_enqueue_rate = Int.MaxValue
@@ -1029,6 +1046,7 @@ class Queue(val router: LocalRouter, val
def drain_acks = might_unfill {
+ val end = System.nanoTime()
ack_source.getData.foreach {
case (entry, consumed, uow) =>
consumed match {
@@ -2194,8 +2212,29 @@ class Subscription(val queue:Queue, val
var consumer_stall_start = 0L
var load_stall_start = 0L
+ var started_at = Broker.now
var total_ack_count = 0L
+ var total_ack_size = 0L
var total_nack_count = 0L
+
+ var idle_start = System.nanoTime()
+ var idle_total = 0L
+
+ def ack_rates = {
+ var duration = ((Broker.now - started_at)*1000000)
+ duration -= idle_total
+ if( idle_start!=0 ) {
+ duration -= System.nanoTime() - idle_start
+ }
+
+ if( duration != 0 && total_ack_count > 0 ) {
+ val ack_rate = 1000000000d * total_ack_count / duration
+ val ack_size_rate = 1000000000d * total_ack_size / duration
+ Some((ack_rate, ack_size_rate))
+ } else {
+ None
+ }
+ }
override def toString = {
def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
@@ -2356,7 +2395,6 @@ class Subscription(val queue:Queue, val
}
def adjust_prefetch_size = {
-
enqueue_size_per_interval = session.enqueue_size_counter - enqueue_size_at_last_interval
enqueue_size_at_last_interval = session.enqueue_size_counter
@@ -2410,6 +2448,11 @@ class Subscription(val queue:Queue, val
class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
+ if(acquired.isEmpty) {
+ idle_total = System.nanoTime() - idle_start
+ idle_start = 0
+ }
+
acquired.addLast(this)
acquired_size += entry.size
@@ -2421,6 +2464,7 @@ class Subscription(val queue:Queue, val
}
total_ack_count += 1
+ total_ack_size += entry.size
if (entry.messageKey != -1) {
val storeBatch = if( uow == null ) {
queue.virtual_host.store.create_uow
@@ -2438,6 +2482,9 @@ class Subscription(val queue:Queue, val
// removes this entry from the acquired list.
unlink()
+ if( acquired.isEmpty ) {
+ idle_start = System.nanoTime()
+ }
// we may now be able to prefetch some messages..
acquired_size -= entry.size
@@ -2492,6 +2539,9 @@ class Subscription(val queue:Queue, val
}
unlink()
+ if( acquired.isEmpty ) {
+ idle_start = System.nanoTime()
+ }
check_finish_close
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/JettyWebServer.scala
Fri Jun 8 00:47:35 2012
@@ -34,6 +34,7 @@ import java.util.jar.JarInputStream
import java.lang.String
import org.eclipse.jetty.servlet.{FilterMapping, FilterHolder}
import org.apache.activemq.apollo.broker.web.{AllowAnyOriginFilter, WebServer, WebServerFactory}
+import javax.servlet._
/**
* <p>
@@ -226,6 +227,14 @@ class JettyWebServer(val broker:Broker)
val origins = cors_origin.split(",").map(_.trim()).toSet
context.addFilter(new FilterHolder(new AllowAnyOriginFilter(origins)), "/*",
FilterMapping.DEFAULT)
}
+ context.addFilter(new FilterHolder(new Filter(){
+ def init(p1: FilterConfig) {}
+ def destroy() {}
+ def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain)
= {
+ request.setAttribute("APOLLO_BROKER", broker)
+ chain.doFilter(request, response)
+ }
+ }), "/*", FilterMapping.DEFAULT)
if( broker.tmp !=null ) {
context.setTempDirectory(broker.tmp)
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java
(from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&r1=1347662&r2=1347848&rev=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConsumerLoadDTO.java
Fri Jun 8 00:47:35 2012
@@ -14,31 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.apollo.dto;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashSet;
/**
+ * The
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="queue_consumer_link")
+@XmlRootElement(name="destination_load")
@XmlAccessorType(XmlAccessType.FIELD)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueConsumerLinkDTO extends LinkDTO {
+public class ConsumerLoadDTO {
+
+ @XmlAttribute(name="user")
+ public String user;
+
+ @XmlAttribute(name="selector")
+ public String selector;
- public long position = 0;
+ @XmlAttribute(name="ack_item_rate")
+ public Double ack_item_rate;
- public int acquired_count;
- public long acquired_size;
+ @XmlAttribute(name="ack_size_rate")
+ public Double ack_size_rate;
- public long total_ack_count;
- public long total_nack_count;
-
- /**
- * What the consumer is currently waiting on
- */
- @XmlAttribute(name="waiting_on")
- public String waiting_on;
-}
\ No newline at end of file
+}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
Fri Jun 8 00:47:35 2012
@@ -55,6 +55,6 @@ public class DestinationLoadDTO {
@XmlAttribute(name="message_size_dequeue_counter")
public Long message_size_dequeue_counter;
- public HashSet<String> consumer_selectors = new HashSet<String>();
+ public ArrayList<ConsumerLoadDTO> consumers = new ArrayList<ConsumerLoadDTO>();
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
Fri Jun 8 00:47:35 2012
@@ -36,6 +36,9 @@ public class QueueConsumerLinkDTO extend
public long total_ack_count;
public long total_nack_count;
+ public Double ack_item_rate;
+ public Double ack_size_rate;
+
/**
* What the consumer is currently waiting on
*/
Modified: activemq/activemq-apollo/trunk/apollo-network/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/pom.xml?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-network/pom.xml Fri Jun 8 00:47:35 2012
@@ -151,6 +151,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.1</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
Fri Jun 8 00:47:35 2012
@@ -48,12 +48,12 @@ class RestLoadMonitor extends BaseServic
var poll_interval = 5*1000;
protected def _start(on_completed: Task) = {
- on_completed.run()
schedule_reoccurring(1, SECONDS) {
for(monitor <- members.values) {
monitor.poll
}
}
+ on_completed.run()
}
protected def _stop(on_completed: Task) = {
@@ -106,10 +106,12 @@ class RestLoadMonitor extends BaseServic
}
}
- def add(member: ClusterMemberDTO) = dispatch_queue {
- for(service <- member.services) {
- if( service.kind == "webadmin" ) {
- members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+ def add(member: ClusterMemberDTO) = {
+ dispatch_queue {
+ for(service <- member.services) {
+ if( service.kind == "web_admin" ) {
+ members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+ }
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
Fri Jun 8 00:47:35 2012
@@ -39,10 +39,10 @@ class CounterDrivenRate {
}
class DestinationMetrics {
- var consumer_count = 0L
var message_size = 0L
val enqueue_size_rate = new CounterDrivenRate()
- val dequeue_size_rate = new CounterDrivenRate()
+ var consumer_count = 0L
+ var dequeue_size_rate = 0d
}
class BrokerMetrics() {
@@ -52,7 +52,7 @@ class BrokerMetrics() {
// var topic_load = HashMap[String, DestinationMetrics]()
var timestamp = System.currentTimeMillis()
- def update(current:LoadStatusDTO) = {
+ def update(current:LoadStatusDTO, network_user:String) = {
val now = System.currentTimeMillis()
val duration = (now - timestamp)/1000.0d
timestamp = now
@@ -61,8 +61,18 @@ class BrokerMetrics() {
for( dest <- current.queues ) {
val dest_load = queue_load.get(dest.id).getOrElse(new DestinationMetrics())
dest_load.message_size = dest.message_size
+
+ // Lets not include the network consumers in the the consumer rates..
+ val consumers = dest.consumers.filter(_.user == network_user).toArray
+
+ dest_load.consumer_count = consumers.size
+ dest_load.dequeue_size_rate = 0
+ for( c <- consumers ) {
+ if( c.ack_size_rate !=null ) {
+ dest_load.dequeue_size_rate += c.ack_size_rate
+ }
+ }
dest_load.enqueue_size_rate.update(dest.message_size_enqueue_counter, duration)
- dest_load.dequeue_size_rate.update(dest.message_size_dequeue_counter, duration)
next_queue_load += dest.id -> dest_load
}
queue_load = next_queue_load
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
Fri Jun 8 00:47:35 2012
@@ -93,7 +93,7 @@ class NetworkManager(broker: Broker) ext
}
def on_load_change(dto: LoadStatusDTO) = dispatch_queue {
- metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto)
+ metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, config.user)
}
def load_analysis = {
@@ -113,15 +113,16 @@ class NetworkManager(broker: Broker) ext
val queue_demand_map = HashMap[String, DemandStatus]()
for( (broker, broker_load) <- metrics_map) {
- for( (id, dest) <- broker_load.queue_load ) {
- val status = queue_demand_map.getOrElseUpdate(id, new DemandStatus)
- if( can_bridge_from(broker) && needs_more_consumers(dest) ) {
+ for( (dest_name, dest) <- broker_load.queue_load ) {
+ val status = queue_demand_map.getOrElseUpdate(dest_name, new DemandStatus)
+ var needsmoreconsumers = needs_more_consumers(dest)
+ if( can_bridge_from(broker) && needsmoreconsumers ) {
// The broker needs more consumers to drain the queue..
- status.needs_consumers += (id->dest)
+ status.needs_consumers += (broker->dest)
} else {
// The broker can drain the queue of other brokers..
if( can_bridge_to(broker) && dest.consumer_count > 0 ) {
- status.has_consumers += (id->dest)
+ status.has_consumers += (broker->dest)
}
}
}
@@ -129,7 +130,7 @@ class NetworkManager(broker: Broker) ext
val desired_bridges = HashSet[BridgeInfo]()
for( (id, demand) <- queue_demand_map ) {
- for( (to, to_metrics)<- demand.needs_consumers; (from, from_metrics) <-demand.has_consumers
) {
+ for( (from, from_metrics)<- demand.needs_consumers; (to, to_metrics) <-demand.has_consumers
) {
// we could get fancy and compare the to_metrics and from_metrics to avoid
// setting up bridges that won't make a big difference..
desired_bridges += BridgeInfo(from, to, "queue", id)
@@ -152,13 +153,12 @@ class NetworkManager(broker: Broker) ext
}
- var local_broker_id = ""
- var enable_duplex = false
+ def local_broker_id = config.self
def can_bridge_from(broker:String):Boolean = broker==local_broker_id
def can_bridge_to(broker:String):Boolean = {
if ( broker == local_broker_id) {
- enable_duplex
+ OptionSupport(config.duplex).getOrElse(false)
} else {
true
}
@@ -171,7 +171,7 @@ class NetworkManager(broker: Broker) ext
return false
}
- val drain_rate = dest.dequeue_size_rate.mean - dest.enqueue_size_rate.mean
+ val drain_rate = dest.dequeue_size_rate - dest.enqueue_size_rate.mean
if( drain_rate < 0 ) {
// Not draining...
return true
@@ -204,7 +204,7 @@ class NetworkManager(broker: Broker) ext
// Lets look to see if we can use the strategy with services exposed by the broker..
for( to_service <- to.services; from_service <- from.services ) {
- if( to_service.kind==service_kind && to_service.kind==from_service.kind
) {
+ if( bridging_strategy==null && to_service.kind==service_kind &&
to_service.kind==from_service.kind ) {
bridging_strategy = strategy
bridging_strategy_info = BridgeInfo(from_service.address, to_service.address,
info.kind, info.dest)
bridging_strategy.deploy( bridging_strategy_info )
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
Fri Jun 8 00:47:35 2012
@@ -21,12 +21,12 @@ import org.fusesource.hawtdispatch._
import java.net.URI
import org.apache.activemq.apollo.util.{StateMachine, Log}
import org.fusesource.hawtbuf.AsciiBuffer
-import org.fusesource.stomp.codec.StompFrame
import org.apache.activemq.apollo.broker.Broker
import java.util.Properties
import org.fusesource.stomp.client.{CallbackConnection, Stomp}
import java.util.concurrent.TimeUnit
import collection.mutable.HashMap
+import org.fusesource.stomp.codec.StompFrame
object StompBridgingStrategy extends Log {
@@ -40,6 +40,9 @@ class StompBridgingStrategy(val manager:
val bridges = HashMap[(String, String), Bridge]()
+ def bridge_user = manager.config.user
+ def bridge_password = manager.config.password
+
def deploy(info:BridgeInfo) = {
dispatch_queue.assertExecuting()
val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to))
@@ -60,8 +63,28 @@ class StompBridgingStrategy(val manager:
val from_connection = ConnectionStateMachine(new URI(from))
val to_connection = ConnectionStateMachine(new URI(to))
- from_connection.refiller = ^{
-
+ from_connection.receive_handler = frame => {
+ val original_state = from_connection.state
+ frame.action() match {
+ case MESSAGE =>
+ // forward it..
+ frame.action(SEND)
+ println("forwarding message: "+frame.getHeader(MESSAGE_ID))
+ to_connection.send(frame, ()=>{
+ // Ack it if the original connection is still up...
+ // TODO: if it's not a we will probably get a dup/redelivery.
+ // Might want to introduce some dup detection at this point.
+ if( from_connection.state eq original_state ) {
+ val ack = new StompFrame(ACK);
+ ack.addHeader(SUBSCRIPTION, frame.getHeader(SUBSCRIPTION))
+ ack.addHeader(MESSAGE_ID, frame.getHeader(MESSAGE_ID))
+ from_connection.send(ack, null)
+ println("forwarded message, now acking: "+frame.getHeader(MESSAGE_ID))
+ }
+ })
+ case _ =>
+ println("unhandled stomp frame: "+frame)
+ }
}
dispatch_queue {
@@ -75,10 +98,8 @@ class StompBridgingStrategy(val manager:
var subscriptions = HashMap[AsciiBuffer, AsciiBuffer]()
var pending_sends = HashMap[Long, (StompFrame, ()=>Unit)]()
- var refiller: Runnable = ^{ sys.error("refiller not set") }
- var receive_handler: (StompFrame)=>Boolean = frame => {
+ var receive_handler: (StompFrame)=>Unit = frame => {
info("dropping frame: %s", frame)
- true
}
@@ -107,8 +128,8 @@ class StompBridgingStrategy(val manager:
val to_stomp = new Stomp()
to_stomp.setDispatchQueue(dispatch_queue)
to_stomp.setRemoteURI(uri)
- to_stomp.setLogin("admin")
- to_stomp.setPasscode("password")
+ to_stomp.setLogin(bridge_user)
+ to_stomp.setPasscode(bridge_password)
to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL)
val headers = new Properties()
headers.put("client-type", "apollo-bridge")
@@ -143,15 +164,13 @@ class StompBridgingStrategy(val manager:
debug("Bridge connected to: %s", uri)
connection.receive(new org.fusesource.stomp.client.Callback[StompFrame] {
override def onSuccess(value: StompFrame) = {
- if( !receive_handler(value) ) {
- connection.suspend()
- }
+ receive_handler(value)
}
override def onFailure(value: Throwable) = {
failed(value)
}
})
- connection.refiller(refiller)
+ connection.resume()
// Reconnect any subscriptions.
subscriptions.keySet.foreach(subscribe(_))
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
Fri Jun 8 00:47:35 2012
@@ -31,7 +31,7 @@ public class ClusterMemberDTO {
@XmlAttribute(name="id")
public String id;
- @XmlElementRef(name="service")
+ @XmlElement(name="service")
public ArrayList<ClusterServiceDTO> services = new ArrayList<ClusterServiceDTO>();
}
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
Fri Jun 8 00:47:35 2012
@@ -31,8 +31,17 @@ import java.util.ArrayList;
@JsonIgnoreProperties(ignoreUnknown = true)
public class NetworkManagerDTO extends CustomServiceDTO {
-// @XmlElementRef(name="bridge")
-// public ArrayList<BridgeDTO> bridges = new ArrayList<BridgeDTO>();
+ @XmlAttribute(name="user")
+ public String user;
+
+ @XmlAttribute(name="password")
+ public String password;
+
+ @XmlAttribute(name="self")
+ public String self;
+
+ @XmlAttribute(name="duplex")
+ public Boolean duplex;
@XmlElement(name="member")
public ArrayList<ClusterMemberDTO> members = new ArrayList<ClusterMemberDTO>();
Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
Fri Jun 8 00:47:35 2012
@@ -26,13 +26,13 @@
<web_admin bind="http://0.0.0.0:41000"/>
<connector id="tcp" bind="tcp://0.0.0.0:41001"/>
- <network_manager>
+ <network_manager self="broker1">
<member id="broker1">
- <service kind="web_admin" address="http://0.0.0.0:41000"/>
+ <service kind="web_admin" address="http://0.0.0.0:41000/api/json/broker/virtual-hosts/broker1/load-status"/>
<service kind="stomp" address="tcp://0.0.0.0:41001"/>
</member>
<member id="broker2">
- <service kind="web_admin" address="http://0.0.0.0:42000"/>
+ <service kind="web_admin" address="http://0.0.0.0:42000/api/json/broker/virtual-hosts/broker2/load-status"/>
<service kind="stomp" address="tcp://0.0.0.0:42001"/>
</member>
</network_manager>
Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
Fri Jun 8 00:47:35 2012
@@ -26,13 +26,13 @@
<web_admin bind="http://0.0.0.0:42000"/>
<connector id="tcp" bind="tcp://0.0.0.0:42001"/>
- <network_manager>
+ <network_manager self="broker2">
<member id="broker1">
- <service kind="web_admin" address="http://0.0.0.0:41000"/>
+ <service kind="web_admin" address="http://0.0.0.0:41000/api/json/broker/virtual-hosts/broker1/load-status"/>
<service kind="stomp" address="tcp://0.0.0.0:41001"/>
</member>
<member id="broker2">
- <service kind="web_admin" address="http://0.0.0.0:42000"/>
+ <service kind="web_admin" address="http://0.0.0.0:42000/api/json/broker/virtual-hosts/broker2/load-status"/>
<service kind="stomp" address="tcp://0.0.0.0:42001"/>
</member>
</network_manager>
Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
Fri Jun 8 00:47:35 2012
@@ -20,7 +20,11 @@ package org.apache.activemq.apollo.broke
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import org.apache.activemq.apollo.broker.MultiBrokerTestSupport
-
+import javax.jms.Session._
+import org.fusesource.stomp.jms.{StompJmsDestination, StompJmsConnectionFactory}
+import collection.mutable.ListBuffer
+import javax.jms.{Message, TextMessage, Connection, ConnectionFactory}
+import java.util.concurrent.TimeUnit._
class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach
{
@@ -29,10 +33,55 @@ class NetworkTest extends MultiBrokerTes
"xml:classpath:apollo-network-2.xml"
)
- test("basics") {
- admins(0).broker should not be(null)
- val config = admins(0).broker.config
- admins(1).broker should not be(null)
+ val connections = ListBuffer[Connection]()
+
+ override protected def afterEach() = {
+ for( c <- connections ) {
+ try {
+ c.close()
+ } catch {
+ case ignore =>
+ }
+ }
+ connections.clear()
+ }
+
+ def create_connection(factory:ConnectionFactory) = {
+ val rc = factory.createConnection()
+ rc.start()
+ connections += rc
+ rc
+ }
+
+ def connection_factories = admins.map { admin =>
+ val rc = new StompJmsConnectionFactory();
+ rc.setBrokerURI("tcp://localhost:"+admin.port);
+ rc
+ }
+
+ def create_connections = connection_factories.map(create_connection(_))
+
+ def test_destination(kind:String="queue", name:String=testName) =
+ new StompJmsDestination("/"+kind+"/"+name.replaceAll("[^a-zA-Z0-9._-]", "_"))
+
+ def text(msg:Message) = msg match {
+ case msg:TextMessage => Some(msg.getText)
+ case _ => None
}
+ test("forward one message") {
+ val connections = create_connections
+
+ val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE)
+ val p0 = s0.createProducer(test_destination())
+ p0.send(s0.createTextMessage("1"))
+
+ val s1 = connections(1).createSession(false, AUTO_ACKNOWLEDGE)
+ val c1 = s1.createConsumer(test_destination())
+ within(30, SECONDS) {
+ text(c1.receive()) should be(Some("1"))
+ }
+ }
+
+
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Jun 8 00:47:35 2012
@@ -178,6 +178,10 @@ class StompProtocolHandler extends Proto
override def start_from_tail = from_seq == -1
+ override def jms_selector = if(selector!=null){ selector._1 } else { null }
+
+ override def user = security_context.user
+
var starting_seq:Long = 0L
override def set_starting_seq(seq: Long):Unit = {
starting_seq=seq
Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala
(from r1347662, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&r1=1347662&r2=1347848&rev=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CircularBuffer.scala
Fri Jun 8 00:47:35 2012
@@ -14,31 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.dto;
+package org.apache.activemq.apollo.util
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import javax.xml.bind.annotation.*;
+import collection.mutable.ArrayBuffer
/**
+ * <p>A circular buffer</p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="queue_consumer_link")
-@XmlAccessorType(XmlAccessType.FIELD)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueConsumerLinkDTO extends LinkDTO {
-
- public long position = 0;
-
- public int acquired_count;
- public long acquired_size;
-
- public long total_ack_count;
- public long total_nack_count;
-
- /**
- * What the consumer is currently waiting on
- */
- @XmlAttribute(name="waiting_on")
- public String waiting_on;
+class CircularBuffer[T](max:Int) extends ArrayBuffer[T](max) {
+
+ def max_size = max
+ private var pos = 0
+
+ override def +=(elem: T): this.type = {
+ if( size < initialSize ) {
+ super.+=(elem)
+ } else {
+ evicted(this(pos))
+ this.update(pos, elem)
+ pos += 1
+ if( pos >= initialSize ) {
+ pos = 0
+ }
+ }
+ this
+ }
+
+ /**
+ * Sub classes can override this method to so they can be
+ * notified when an element is being evicted from the circular
+ * buffer.
+ */
+ protected def evicted(elem:T) = {}
+
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Fri Jun 8 00:47:35 2012
@@ -334,17 +334,21 @@ abstract class Resource(parent:Resource=
host.router.asInstanceOf[LocalRouter]
}
- def now = BrokerRegistry.list.headOption.map(_.now).getOrElse(System.currentTimeMillis())
+ def now = System.currentTimeMillis()
protected def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker)=>FutureResult[T]):FutureResult[T]
= {
- BrokerRegistry.list.headOption match {
- case Some(broker)=>
- sync(broker) {
- func(broker)
- }
- case None=>
- result(NOT_FOUND)
- }
+ val broker = http_request.getAttribute("APOLLO_BROKER").asInstanceOf[Broker]
+ sync(broker) {
+ func(broker)
+ }
+// BrokerRegistry.list.headOption match {
+// case Some(broker)=>
+// sync(broker) {
+// func(broker)
+// }
+// case None=>
+// result(NOT_FOUND)
+// }
}
protected def with_connector[T](id:String)(func: (org.apache.activemq.apollo.broker.Connector)=>FutureResult[T]):FutureResult[T]
= {
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1347848&r1=1347847&r2=1347848&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Fri Jun 8 00:47:35 2012
@@ -114,6 +114,10 @@ ul
p acks: #{x.total_ack_count} messages
p naks: #{x.total_nack_count} messages
p waiting on: #{x.waiting_on}
+ p acks per second: #{x.ack_item_rate}
+ p bytes acked per second: #{x.ack_size_rate}
+
+
- if ( entries.isEmpty )
h2
|