Author: chirino
Date: Wed Jan 16 22:22:46 2013
New Revision: 1434448
URL: http://svn.apache.org/viewvc?rev=1434448&view=rev
Log:
Add message browsing support to the REST interface.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java
- copied, changed from r1432955, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala Wed Jan 16 22:22:46 2013
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.broker
import org.fusesource.hawtbuf.Buffer
import org.fusesource.hawtbuf.AsciiBuffer
import org.fusesource.hawtbuf.UTF8Buffer
-import org.apache.qpid.proton.amqp.{UnsignedLong, UnsignedInteger}
+import org.apache.qpid.proton.amqp.{UnsignedByte, UnsignedShort, UnsignedLong, UnsignedInteger}
import org.apache.qpid.proton.amqp.messaging.{Properties, Header}
import org.apache.qpid.proton.message.impl.MessageImpl
@@ -247,6 +247,104 @@ class AmqpMessage(private var encoded_bu
rc
}
+
+ override def headers_as_json: java.util.HashMap[String, Object] = {
+ val rc = new java.util.HashMap[String, Object]()
+ import collection.JavaConversions._
+
+ def convert(v: AnyRef) = (v match {
+ case v: UnsignedByte => new java.lang.Integer(v.shortValue())
+ case v: UnsignedShort => new java.lang.Integer(v.intValue())
+ case v: UnsignedInteger => new java.lang.Long(v.longValue())
+ case v: UnsignedLong => new java.lang.Long(v.longValue())
+ case _ => v
+ })
+
+ if ( decoded.getHeader!=null ) {
+ val header = decoded.getHeader
+ if ( header.getDeliveryCount !=null ) {
+ rc.put("header.delivery_count", new java.lang.Long(header.getDeliveryCount.longValue()))
+ }
+ if ( header.getDurable !=null ) {
+ rc.put("header.durable", new java.lang.Boolean(header.getDurable.booleanValue()))
+ }
+ if ( header.getFirstAcquirer !=null ) {
+ rc.put("header.first_acquirer", new java.lang.Boolean(header.getFirstAcquirer.booleanValue()))
+ }
+ if ( header.getPriority !=null ) {
+ rc.put("header.priority", new java.lang.Integer(header.getPriority.intValue()))
+ }
+ if ( header.getTtl !=null ) {
+ rc.put("header.ttl", new java.lang.Long(header.getTtl.longValue()))
+ }
+ }
+
+ if( decoded.getProperties != null ) {
+ val properties = decoded.getProperties
+ if ( properties.getAbsoluteExpiryTime !=null ) {
+ rc.put("property.absolute_expiry_time", new java.lang.Long(properties.getAbsoluteExpiryTime.getTime()))
+ }
+ if ( properties.getContentEncoding !=null ) {
+ rc.put("property.content_encoding", properties.getContentEncoding.toString)
+ }
+ if ( properties.getContentType !=null ) {
+ rc.put("property.content_type", properties.getContentType.toString)
+ }
+ if ( properties.getCorrelationId !=null ) {
+ rc.put("property.correlation_id", properties.getCorrelationId.toString)
+ }
+ if ( properties.getCreationTime !=null ) {
+ rc.put("property.creation_time", new java.lang.Long(properties.getCreationTime.getTime))
+ }
+ if ( properties.getGroupId !=null ) {
+ rc.put("property.group_id", properties.getGroupId)
+ }
+ if ( properties.getGroupSequence !=null ) {
+ rc.put("property.group_sequence", new java.lang.Long(properties.getGroupSequence.longValue()))
+ }
+ if ( properties.getMessageId !=null ) {
+ rc.put("property.message_id", properties.getMessageId)
+ }
+ if ( properties.getReplyTo !=null ) {
+ rc.put("property.reply_to", properties.getReplyTo)
+ }
+ if ( properties.getReplyToGroupId !=null ) {
+ rc.put("property.reply_to_group_id", properties.getReplyToGroupId)
+ }
+ if ( properties.getSubject !=null ) {
+ rc.put("property.subject", properties.getSubject)
+ }
+ if ( properties.getTo !=null ) {
+ rc.put("property.to", properties.getTo)
+ }
+ if ( properties.getUserId !=null ) {
+ rc.put("property.user_id", properties.getUserId.toString)
+ }
+ }
+
+ if( decoded.getDeliveryAnnotations !=null ) {
+ val annotations = decoded.getDeliveryAnnotations
+ for( (k,v:AnyRef) <- annotations.getValue ) {
+ rc.put("annotation."+k, convert(v))
+ }
+ }
+
+ if( decoded.getApplicationProperties !=null ) {
+ val properties = decoded.getApplicationProperties
+ for( (k,v:AnyRef) <- properties.getValue ) {
+ rc.put("app."+k, convert(v))
+ }
+ }
+
+ if( decoded.getFooter !=null ) {
+ val footer = decoded.getFooter
+ for( (k,v:AnyRef) <- footer.getValue ) {
+ rc.put("footer."+k, convert(v))
+ }
+ }
+ rc
+ }
+
def release() {}
def retain() {}
def retained(): Int = 0
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jan 16 22:22:46 2013
@@ -120,6 +120,8 @@ trait Message extends Filterable with Re
*/
def codec:MessageCodec
+ def headers_as_json = new java.util.HashMap[String, Object]()
+
def encoded:Buffer = codec.encode(this).buffer
def message_group: String = null
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 16 22:22:46 2013
@@ -332,6 +332,41 @@ class Queue(val router: LocalRouter, val
rc
}
+ def browse(from_seq:Long, to:Option[Long], max:Long)(func: (Array[(EntryStatusDTO, Delivery)])=>Unit):Unit = {
+ var result = ListBuffer[(EntryStatusDTO, Delivery)]()
+ def load_from(start:Long):Unit = {
+ assert_executing
+ var cur = head_entry.getNext
+ while(true) {
+ if( cur == null || result.size >= max || ( to.isDefined && cur.seq > to.get) ) {
+ func(result.toArray)
+ return
+ }
+ val next = cur.getNext
+ if ( cur.seq >= start ) {
+ cur.state match {
+ case state:QueueEntry#Loaded =>
+ result.append((create_entry_status(cur), state.delivery))
+ case state:QueueEntry#Swapped =>
+ state.swapped_in_watchers ::=(()=>{
+ load_from(cur.seq) // resume loading
+ })
+ cur.load(consumer_swapped_in)
+ return
+ case state:QueueEntry#SwappedRange =>
+ state.swapped_in_watchers ::=(()=>{
+ load_from(cur.seq)
+ })
+ cur.load(consumer_swapped_in)
+ return
+ }
+ }
+ cur = next
+ }
+ }
+ load_from(from_seq)
+ }
+
def status(entries:Boolean=false, include_producers:Boolean=false, include_consumers:Boolean=false) = {
val rc = new QueueStatusDTO
rc.id = this.id
@@ -353,17 +388,7 @@ class Queue(val router: LocalRouter, val
if( entries ) {
var cur = this.head_entry
while( cur!=null ) {
-
- val e = new EntryStatusDTO
- e.seq = cur.seq
- e.count = cur.count
- e.size = cur.size
- e.consumer_count = cur.parked.size
- e.is_prefetched = cur.prefetched
- e.state = cur.label
-
- rc.entries.add(e)
-
+ rc.entries.add(create_entry_status(cur))
cur = if( cur == this.tail_entry ) {
null
} else {
@@ -397,41 +422,7 @@ class Queue(val router: LocalRouter, val
if( include_consumers ) {
for( sub <- this.all_subscriptions.values ) {
- val link = new QueueConsumerLinkDTO
- sub.consumer.connection match {
- case Some(connection) =>
- link.kind = "connection"
- link.id = connection.id.toString
- link.label = connection.transport.getRemoteAddress.toString
- case _ =>
- link.kind = "unknown"
- link.label = "unknown"
- }
- link.position = sub.pos.seq
- link.enqueue_item_counter = sub.session.enqueue_item_counter
- link.enqueue_size_counter = sub.session.enqueue_size_counter
- link.enqueue_ts = sub.session.enqueue_ts
- link.total_ack_count = sub.total_ack_count
- link.total_nack_count = sub.total_nack_count
- link.acquired_size = sub.acquired_size
- link.acquired_count = sub.acquired_count
- sub.ack_rates match {
- case Some((items_per_sec, size_per_sec) ) =>
- link.ack_item_rate = items_per_sec
- link.ack_size_rate = size_per_sec
- case _ =>
- }
-
- link.waiting_on = if( sub.full ) {
- "consumer"
- } else if( sub.pos.is_tail ) {
- "producer"
- } else if( !sub.pos.is_loaded ) {
- "load"
- } else {
- "dispatch"
- }
- rc.consumers.add(link)
+ rc.consumers.add(sub.create_link_dto())
}
} else {
rc.consumers = null
@@ -439,6 +430,21 @@ class Queue(val router: LocalRouter, val
rc
}
+
+ def create_entry_status(cur: QueueEntry): EntryStatusDTO = {
+ val rc = new EntryStatusDTO
+ rc.seq = cur.seq
+ rc.count = cur.count
+ rc.size = cur.size
+ rc.consumer_count = cur.parked.size
+ rc.is_prefetched = cur.prefetched
+ rc.state = cur.label
+ if( cur.acquiring_subscription != null ) {
+ rc.acquirer = cur.acquiring_subscription.create_link_dto(false)
+ }
+ rc
+ }
+
def update(on_completed:Task) = dispatch_queue {
val prev_persistent = tune_persistent
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Jan 16 22:22:46 2013
@@ -189,6 +189,7 @@ class QueueEntry(val queue:Queue, val se
def messageKey = state.message_key
def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
def is_acquired = state.is_acquired
+ def acquiring_subscription = state.acquiring_subscription
def dispatch() = state.dispatch
def memory_space = state.memory_space
@@ -296,7 +297,8 @@ class QueueEntry(val queue:Queue, val se
/**
* Is the entry acquired by a subscription.
*/
- def is_acquired = false
+ def is_acquired = acquiring_subscription!=null
+ def acquiring_subscription:Subscription = null
/**
* @returns true if the entry is either swapped or swapping.
@@ -399,7 +401,7 @@ class QueueEntry(val queue:Queue, val se
assert( delivery!=null, "delivery cannot be null")
var acquirer:Subscription = _
- override def is_acquired = acquirer!=null
+ override def acquiring_subscription = acquirer
override def memory_space = space
@@ -723,14 +725,13 @@ class QueueEntry(val queue:Queue, val se
override def redelivery_count = _redeliveries
override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
+ override def acquiring_subscription = acquirer
override def count = 1
override def as_swapped = this
override def is_swapped_or_swapping_out = true
- override def is_acquired = acquirer!=null
-
override def memory_space = space
def label = {
@@ -773,7 +774,6 @@ class QueueEntry(val queue:Queue, val se
}
}
-
def to_delivery = {
val delivery = new Delivery()
delivery.seq = seq
@@ -787,6 +787,13 @@ class QueueEntry(val queue:Queue, val se
delivery
}
+ var swapped_in_watchers = List[()=>Unit]()
+ def fire_swapped_in_watchers = {
+ val watchers = swapped_in_watchers
+ swapped_in_watchers = Nil
+ watchers.foreach(_())
+ }
+
def swapped_in(messageRecord:MessageRecord) = {
if( space!=null ) {
// debug("Loaded message seq: ", seq )
@@ -803,9 +810,8 @@ class QueueEntry(val queue:Queue, val se
queue.individual_swapped_items -= 1
state = new Loaded(delivery, true, space)
space = null
- } else {
-// debug("Ignoring store load of: ", messageKey)
}
+ fire_swapped_in_watchers
}
@@ -927,6 +933,13 @@ class QueueEntry(val queue:Queue, val se
}
override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+", size: "+size+"}" }
+ var swapped_in_watchers = List[()=>Unit]()
+ def fire_swapped_in_watchers = {
+ val watchers = swapped_in_watchers
+ swapped_in_watchers = Nil
+ watchers.foreach(_())
+ }
+
override def swap_in(space:MemorySpace):Unit = {
if( !loading ) {
loading = true
@@ -972,8 +985,8 @@ class QueueEntry(val queue:Queue, val se
parked.foreach(_.advance(next))
next :::= parked
queue.trigger_swap
-
unlink
+ fire_swapped_in_watchers
}
def attempt_load(attempt_counter:Int):Unit = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jan 16 22:22:46 2013
@@ -21,6 +21,7 @@ import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.util.list._
+import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -49,6 +50,45 @@ class Subscription(val queue:Queue, val
var enqueue_size_per_interval = new CircularBuffer[Int](15)
+ def create_link_dto(include_metrics:Boolean=true) = {
+ val link = new QueueConsumerLinkDTO
+ consumer.connection match {
+ case Some(connection) =>
+ link.kind = "connection"
+ link.id = connection.id.toString
+ link.label = connection.transport.getRemoteAddress.toString
+ case _ =>
+ link.kind = "unknown"
+ link.label = "unknown"
+ }
+ if ( include_metrics ) {
+ link.position = pos.seq
+ link.enqueue_item_counter = session.enqueue_item_counter
+ link.enqueue_size_counter = session.enqueue_size_counter
+ link.enqueue_ts = session.enqueue_ts
+ link.total_ack_count = total_ack_count
+ link.total_nack_count = total_nack_count
+ link.acquired_size = acquired_size
+ link.acquired_count = acquired_count
+ ack_rates match {
+ case Some((items_per_sec, size_per_sec) ) =>
+ link.ack_item_rate = items_per_sec
+ link.ack_size_rate = size_per_sec
+ case _ =>
+ }
+ link.waiting_on = if( full ) {
+ "consumer"
+ } else if( pos.is_tail ) {
+ "producer"
+ } else if( !pos.is_loaded ) {
+ "load"
+ } else {
+ "dispatch"
+ }
+ }
+ link
+ }
+
def avg_enqueue_size_per_interval = {
var rc = 0
if( enqueue_size_per_interval.size > 0 ) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Jan 16 22:22:46 2013
@@ -22,7 +22,6 @@ import org.apache.activemq.apollo.dto._
import java.util.concurrent.TimeUnit
import org.fusesource.hawtdispatch._
import collection.mutable.{HashSet, HashMap, ListBuffer}
-import java.lang.Long
import security.SecuredResource
/**
@@ -323,6 +322,19 @@ class Topic(val router:LocalRouter, val
}
}
+ def browse(from_seq:Long, max:Long)(func: (Array[(EntryStatusDTO, Delivery)])=>Unit):Unit = {
+ val msg = retained_message
+ if ( msg==null ) {
+ func(Array())
+ } else {
+ val status = new EntryStatusDTO()
+ status.seq = retained_message.seq
+ status.size = retained_message.size
+ status.state = "loaded"
+ status.is_prefetched = true;
+ func(Array((status, retained_message)))
+ }
+ }
def update(on_completed:Task) = {
refresh_config
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Wed Jan 16 22:22:46 2013
@@ -121,6 +121,8 @@ trait DecodedUdpMessage {
abstract class UdpProtocolHandler extends ProtocolHandler {
import UdpProtocolHandler._
type ConfigTypeDTO <: ProtocolDTO
+ def configClass:Class[ConfigTypeDTO]
+
def protocol = "udp"
var session_id:Option[String] = None
@@ -148,10 +150,13 @@ abstract class UdpProtocolHandler extend
configure(connection.connector.config match {
case connector_config:AcceptingConnectorDTO =>
- connector_config.protocols.flatMap{ _ match {
- case x:ConfigTypeDTO => Some(x)
- case _ => None
- }}.headOption
+ connector_config.protocols.flatMap{ x=>
+ if( x.getClass == configClass) {
+ Some(x.asInstanceOf[ConfigTypeDTO])
+ } else {
+ None
+ }
+ }.headOption
case _ => None
})
}
@@ -290,6 +295,7 @@ class UdpProtocol extends BaseProtocol {
def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
type ConfigTypeDTO = UdpDTO
+ def configClass = classOf[ConfigTypeDTO]
var default_host:VirtualHost = _
var topic_address:AsciiBuffer = _
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java Wed Jan 16 22:22:46 2013
@@ -20,10 +20,7 @@ package org.apache.activemq.apollo.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
/**
* <p>
@@ -57,4 +54,6 @@ public class EntryStatusDTO {
@XmlAttribute(name="is_prefetched")
public boolean is_prefetched;
+ @XmlElement
+ public QueueConsumerLinkDTO acquirer;
}
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java (from r1432955, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&r1=1432955&r2=1434448&rev=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java Wed Jan 16 22:22:46 2013
@@ -16,32 +16,57 @@
*/
package org.apache.activemq.apollo.dto;
+
+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
/**
+ * <p>
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="queue_consumer_link")
-@XmlAccessorType(XmlAccessType.FIELD)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueConsumerLinkDTO extends LinkDTO {
+public class MessageStatusDTO {
- public long position = 0;
+ /**
+ * Additional
+ */
+ public EntryStatusDTO entry;
- public int acquired_count;
- public long acquired_size;
+ /**
+ * When the message will expire
+ */
+ public long expiration;
- public long total_ack_count;
- public long total_nack_count;
+ /**
+ * Is the delivery persistent?
+ */
+ public boolean persistent = false;
- public Double ack_item_rate;
- public Double ack_size_rate;
+ /**
+ * The encoding that the message is stored in.
+ */
+ public String codec;
+
+ /**
+ * A map of all the headers in the mesasge.
+ */
+ public HashMap<String, Object> headers = new HashMap<String, Object>();
/**
- * What the consumer is currently waiting on
+ * The body of the message in base 64 encoding.
*/
- @XmlAttribute(name="waiting_on")
- public String waiting_on;
+ public String base64_body;
+
+ /**
+ * Has the body been truncated.
+ */
+ public boolean body_truncated;
+
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java Wed Jan 16 22:22:46 2013
@@ -28,13 +28,13 @@ import javax.xml.bind.annotation.*;
@JsonIgnoreProperties(ignoreUnknown = true)
public class QueueConsumerLinkDTO extends LinkDTO {
- public long position = 0;
+ public Long position;
- public int acquired_count;
- public long acquired_size;
+ public Integer acquired_count;
+ public Long acquired_size;
- public long total_ack_count;
- public long total_nack_count;
+ public Long total_ack_count;
+ public Long total_nack_count;
public Double ack_item_rate;
public Double ack_size_rate;
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Wed Jan 16 22:22:46 2013
@@ -72,6 +72,37 @@ class OpenwireMessage(val message:Active
}
}
+
+ override def headers_as_json: java.util.HashMap[String, Object] = {
+ val rc = new java.util.HashMap[String, Object]()
+ import collection.JavaConversions._
+
+ def fillin(name:String) = {
+ val t = getProperty(name)
+ if ( t !=null ) {
+ rc.put(name, t.asInstanceOf[Object])
+ }
+ }
+
+ fillin("JMSDeliveryMode")
+ fillin("JMSPriority")
+ fillin("JMSType")
+ fillin("JMSMessageID")
+ fillin("JMSDestination")
+ fillin("JMSReplyTo")
+ fillin("JMSCorrelationID")
+ fillin("JMSExpiration")
+ fillin("JMSXDeliveryCount")
+ fillin("JMSXUserID")
+ fillin("JMSXGroupID")
+ fillin("JMSXGroupSeq")
+
+ for( (x,y) <- message.getProperties ) {
+ rc.put(x,y)
+ }
+ rc
+ }
+
def getLocalConnectionId = message.getProducerId.getConnectionId
def codec = OpenwireMessageCodec
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jan 16 22:22:46 2013
@@ -149,6 +149,14 @@ case class StompFrameMessage(frame:Stomp
}
+ override def headers_as_json: java.util.HashMap[String, Object] = {
+ val rc = new java.util.HashMap[String, Object]
+ for( (k,v)<-headerIndex ) {
+ rc.put(k.toString, v.toString)
+ }
+ rc
+ }
+
def setDisposer(disposer: Runnable) = throw new UnsupportedOperationException
def retained = throw new UnsupportedOperationException
def retain = frame.retain
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala Wed Jan 16 22:22:46 2013
@@ -36,6 +36,7 @@ class StompUdpProtocol extends UdpProtoc
override def createProtocolHandler = new UdpProtocolHandler {
type ConfigTypeDTO = StompDTO
+ def configClass = classOf[ConfigTypeDTO]
var config:ConfigTypeDTO = _
var destination_parser = Stomp.destination_parser
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Wed Jan 16 22:22:46 2013
@@ -36,7 +36,10 @@ import javax.ws.rs.core.MediaType._
import javax.servlet.http.HttpServletResponse
import FutureResult._
import com.wordnik.swagger.annotations.{ApiOperation, Api}
-import language.implicitConversions;
+import language.implicitConversions
+import org.fusesource.hawtbuf.Buffer
+import org.apache.commons.codec.binary.Base64
+;
@Path( "/api/json/broker")
@Api(value = "/api/json/broker",
@@ -572,20 +575,43 @@ class BrokerResource() extends Resource
}
@GET @Path("/virtual-hosts/{id}/topic-queues/{name:.*}/{qid}")
- @ApiOperation(value = "Gets the status of a topic consumer queue.")
- def topic(@PathParam("id") id : String,@PathParam("name") name : String, @PathParam("qid") qid : Long,
- @QueryParam("entries") entries:Boolean,
- @QueryParam("producers") producers:Boolean,
- @QueryParam("consumers") consumers:Boolean):QueueStatusDTO = {
+ @ApiOperation(value = "Gets the status of a topic consumer queue.")
+ def topic(@PathParam("id") id : String,@PathParam("name") name : String, @PathParam("qid") qid : Long,
+ @QueryParam("entries") entries:Boolean,
+ @QueryParam("producers") producers:Boolean,
+ @QueryParam("consumers") consumers:Boolean):QueueStatusDTO = {
+ with_virtual_host(id) { host =>
+ val router:LocalRouter = host
+ val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+ val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
+ monitoring(node) {
+ sync(queue) {
+ queue.status(entries, producers, consumers)
+ }
+ }
+ }
+ }
+
+
+ @GET @Path("/virtual-hosts/{id}/topics/{name:.*}/messages")
+ @ApiOperation(value = "Gets a list of recent messages sent to the topic.")
+ def topic_messages(@PathParam("id") id : String, @PathParam("name") name : String,
+ @QueryParam("from") _from:java.lang.Long,
+ @QueryParam("max") _max:java.lang.Long,
+ @QueryParam("max_body") _max_body:java.lang.Integer):Array[MessageStatusDTO] = {
+ var from = OptionSupport(_from).getOrElse(0L)
+ var max = OptionSupport(_max).getOrElse(100L)
+ var max_body = OptionSupport(_max_body).getOrElse(0)
with_virtual_host(id) { host =>
- val router:LocalRouter = host
+ val rc = FutureResult[Array[MessageStatusDTO]]()
+ val router: LocalRouter = host
val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
- val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
monitoring(node) {
- sync(queue) {
- queue.status(entries, producers, consumers)
+ node.browse(from, max) { deliveries =>
+ rc.set(Success(deliveries.map(message_convert(max_body, _))))
}
}
+ rc
}
}
@@ -621,6 +647,30 @@ class BrokerResource() extends Resource
}
}
}
+ @GET @Path("/virtual-hosts/{id}/queues/{name:.*}/messages")
+ @ApiOperation(value = "Gets a list of messages that exist on the queue.")
+ def queue_messagesx(@PathParam("id") id : String, @PathParam("name") name : String,
+ @QueryParam("from") _from:java.lang.Long,
+ @QueryParam("to") _to:java.lang.Long,
+ @QueryParam("max") _max:java.lang.Long,
+ @QueryParam("max_body") _max_body:java.lang.Integer):Array[MessageStatusDTO] = {
+ var from = OptionSupport(_from).getOrElse(0L)
+ var to = OptionSupport(_to)
+ var max = OptionSupport(_max).getOrElse(100L)
+ var max_body = OptionSupport(_max_body).getOrElse(0)
+ with_virtual_host(id) { host =>
+ val rc = FutureResult[Array[MessageStatusDTO]]()
+ val router: LocalRouter = host
+ val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+ monitoring(node) {
+ node.browse(from, to, max) { deliveries =>
+ rc.set(Success(deliveries.map(message_convert(max_body, _))))
+ }
+ }
+ rc
+ }
+ }
+
@DELETE @Path("/virtual-hosts/{id}/queues/{name:.*}")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
@@ -663,6 +713,28 @@ class BrokerResource() extends Resource
}
}
+ private def base64(buffer:Buffer) =
+ new String(Base64.encodeBase64(buffer.toByteArray), "UTF-8");
+
+ private def message_convert(max_body:Int, value:(EntryStatusDTO, Delivery)): MessageStatusDTO = {
+ val (entry, delivery) = value
+ val rc = new MessageStatusDTO
+ rc.codec = delivery.message.codec.id()
+ rc.headers = delivery.message.headers_as_json;
+ rc.expiration = delivery.expiration
+ rc.entry = entry
+
+ if( max_body > 0 ) {
+ val body = delivery.message.getBodyAs(classOf[Buffer])
+ if( body.length > max_body) {
+ body.length = max_body
+ rc.body_truncated = true
+ }
+ rc.base64_body = base64(body)
+ }
+ rc
+ }
+
@GET @Path("/virtual-hosts/{id}/dsubs")
@ApiOperation(value = "Gets a list of all the durable subscriptions that exist on the broker.")
@Produces(Array(APPLICATION_JSON))
@@ -720,6 +792,30 @@ class BrokerResource() extends Resource
}
}
+ @GET @Path("/virtual-hosts/{id}/dsub/{name:.*}/messages")
+ @ApiOperation(value = "Gets a list of the messages that exist on the durable sub.")
+ def dsub_messages(@PathParam("id") id : String, @PathParam("name") name : String,
+ @QueryParam("from") _from:java.lang.Long,
+ @QueryParam("to") _to:java.lang.Long,
+ @QueryParam("max") _max:java.lang.Long,
+ @QueryParam("max_body") _max_body:java.lang.Integer):Array[MessageStatusDTO] = {
+ var from = OptionSupport(_from).getOrElse(0L)
+ var to = OptionSupport(_to)
+ var max = OptionSupport(_max).getOrElse(100L)
+ var max_body = OptionSupport(_max_body).getOrElse(0)
+
+ with_virtual_host(id) { host =>
+ val rc = FutureResult[Array[MessageStatusDTO]]()
+ val router: LocalRouter = host
+ val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+ monitoring(node) {
+ node.browse(from, to, max) { deliveries =>
+ rc.set(Success(deliveries.map(message_convert(max_body, _))))
+ }
+ }
+ rc
+ }
+ }
private def decode_path(name:String) = {
try {
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css Wed Jan 16 22:22:46 2013
@@ -57,6 +57,10 @@ div.application {
float:right;
}
+.nowrap {
+ white-space:nowrap;
+}
+
@media (max-width: 767px) {
#footer {
margin-right: -20px;
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html Wed Jan 16 22:22:46 2013
@@ -1,5 +1,5 @@
<!DOCTYPE html>
-<html lang="en">
+<html lang="en" xmlns="http://www.w3.org/1999/html">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
@@ -200,67 +200,151 @@
</table>
</div>
</div>
- <div class="span8">
- {{#if App.destination.producers}}
- <div>
- <h4>Producers</h4>
- <table class="details table table-bordered table-striped" style="">
- <tbody>
- <tr>
- <th>Name</th>
- <th>Kind</th>
- <th>Items</th>
- <th>Size</th>
- <th>Last Enqueue</th>
- </tr>
- {{#each App.destination.producers}}
- <tr>
- <td>{{label}}</td>
- <td>{{kind}}</td>
- <td>{{enqueue_item_counter}}</td>
- <td>{{enqueue_size_counter}}</td>
- <td>{{enqueue_date}}</td>
- </tr>
- {{/each}}
- </tbody>
- </table>
- </div>
- {{/if}}
- {{#if App.destination.consumers}}
- <div>
- <h4>Consumers</h4>
- <table class="details table table-bordered table-striped" style="">
- <tbody>
- <tr>
- <th>Name</th>
- <th>Kind</th>
- <th>Items</th>
- <th>Size</th>
- <th>Last Enqueue</th>
- <th>Acks</th>
- <th>Nacks</th>
+
+ <div class="span7">
+ <ul class="nav nav-tabs">
+ <li class="active"><a href="#TAB_Messages" data-toggle="tab">Messages ({{App.destination.metrics.queue_items}})</a></li>
+ <li><a href="#TAB_Producers" data-toggle="tab">Producers ({{App.destination.producers.length}})</a></li>
+ <li><a href="#TAB_Consumers" data-toggle="tab">Consumers ({{App.destination.producers.length}})</a></li>
+ </ul>
+ <div class="tabbable">
+ <div class="tab-content">
+ <div class="tab-pane active" id="TAB_Messages">
+
+ <div class="well form-horizontal">
+ <label class="control-label"><strong>Message: </strong></label>
+ <div class="controls">
+ {{view Ember.TextField class="input-xlarge" placeholder="body" valueBinding="App.MessagesController.send_body"}}
+ <a class="btn" href="#" {{action "send" target="App.MessagesController" on="click"}}>Send</a>
+ </div>
+ </div>
+
+ <table class="details table table-bordered" style="">
+ <tbody>
+ <tr>
+ <th> </th>
+ <th>Sequence</th>
+ <th>Codec</th>
+ <th>Persistent</th>
+ <th>Expires</th>
+ <th>Prefetched</th>
+ <th>Acquirer</th>
+ <th width="100%">Size</th>
+ </tr>
+ {{#each App.MessagesController}}
+ <tr>
+ <td>
+ <a {{action "toggle_details" . target="App.MessagesController" on="click"}}>
+ {{#if show_details}}
+ <i class="icon-caret-down"></i>
+ {{else}}
+ <i class="icon-caret-right"></i>
+ {{/if}}
+ </a>
+ </td>
+ <td>{{entry.seq}}</td>
+ <td>{{codec}}</td>
+ <td>{{persistent}}</td>
+ <td>{{expiration}}</td>
+ <td>{{entry.is_prefetched}}</td>
+ <td>{{entry.acquirer}}</td>
+ <td>{{entry.size}}</td>
+ </tr>
+ {{#if show_details}}
+ <tr>
+ <td colspan="9">
+ <!-- body -->
+ <div>
+
+ {{#if show_headers}}
+ <h4><a {{action "toggle_headers" . target="App.MessagesController" on="click"}}><i class="icon-caret-down"></i> Headers</a></h4>
+ <table class="details table table-bordered table-striped" style="">
+ <tbody>
+ {{#key_value headers}}
+ <tr>
+ <th class="nowrap">{{key}}</th>
+ <td width="100%">{{value}}</td>
+ </tr>
+ {{/key_value}}
+ </tbody>
+ </table>
+ {{else}}
+ <h4><a {{action "toggle_headers" . target="App.MessagesController" on="click"}}><i class="icon-caret-right"></i> Headers</a></h4>
+ {{/if}}
+ <h4>Body</h4>
+ <div>
+ <pre>{{body}}</pre>
+ </div>
+ </div>
+ </td>
+ </tr>
+ {{/if}}
+ {{/each}}
+ </tbody>
+ </table>
+
+ <div style="padding:.5em 0">
+ <button class="btn btn-small" {{action "remove" target="App.MessagesController" on="click"}}>Delete</button>
+ </div>
+ </div>
+ <div class="tab-pane" id="TAB_Producers">
+ <table class="details table table-bordered table-striped" style="">
+ <tbody>
+ <tr>
+ <th>Name</th>
+ <th>Kind</th>
+ <th>Items</th>
+ <th>Size</th>
<th>Last Enqueue</th>
- <th>Ack Rate</th>
- <th>Status</th>
- </tr>
- {{#each App.destination.consumers}}
- <tr>
- <td>{{label}}</td>
- <td>{{kind}}</td>
- <td>{{enqueue_item_counter}}</td>
- <td>{{enqueue_size_counter}}</td>
- <td>{{enqueue_date}}</td>
- <td>{{total_ack_count}}</td>
- <td>{{total_nack_count}}</td>
- <td>{{ack_item_rate}} msgs/sec {{ack_size_rate}} bytes/sec</td>
- <td>{{waiting_on}}</td>
- </tr>
- {{/each}}
- </tbody>
- </table>
+ </tr>
+ {{#each App.destination.producers}}
+ <tr>
+ <td>{{label}}</td>
+ <td>{{kind}}</td>
+ <td>{{enqueue_item_counter}}</td>
+ <td>{{enqueue_size_counter}}</td>
+ <td>{{enqueue_date}}</td>
+ </tr>
+ {{/each}}
+ </tbody>
+ </table>
+ </div>
+ <div class="tab-pane" id="TAB_Consumers">
+ <table class="details table table-bordered table-striped" style="">
+ <tbody>
+ <tr>
+ <th>Name</th>
+ <th>Kind</th>
+ <th>Items</th>
+ <th>Size</th>
+ <th>Last Enqueue</th>
+ <th>Acks</th>
+ <th>Nacks</th>
+ <th>Last Enqueue</th>
+ <th>Ack Rate</th>
+ <th>Status</th>
+ </tr>
+ {{#each App.destination.consumers}}
+ <tr>
+ <td>{{label}}</td>
+ <td>{{kind}}</td>
+ <td>{{enqueue_item_counter}}</td>
+ <td>{{enqueue_size_counter}}</td>
+ <td>{{enqueue_date}}</td>
+ <td>{{total_ack_count}}</td>
+ <td>{{total_nack_count}}</td>
+ <td>{{ack_item_rate}} msgs/sec {{ack_size_rate}} bytes/sec</td>
+ <td>{{waiting_on}}</td>
+ </tr>
+ {{/each}}
+ </tbody>
+ </table>
+ </div>
+ </div>
</div>
- {{/if}}
</div>
+
+
</div>
{{/if}}
</div>
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js Wed Jan 16 22:22:46 2013
@@ -1,3 +1,48 @@
+
+// Install a base64 decoding function if one is not available.
+if( !window.atob ) {
+ window.atob = function(text){
+ text = text.replace(/\s/g,"");
+ if(!(/^[a-z0-9\+\/\s]+\={0,2}$/i.test(text)) || text.length % 4 > 0){
+ throw new Error("Not a base64-encoded string.");
+ }
+ var digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/",
+ cur, prev, digitNum,
+ i=0,
+ result = [];
+ text = text.replace(/=/g, "");
+ while(i < text.length){
+ cur = digits.indexOf(text.charAt(i));
+ digitNum = i % 4;
+ switch(digitNum){
+ case 1:
+ result.push(String.fromCharCode(prev << 2 | cur >> 4));
+ break;
+ case 2:
+ result.push(String.fromCharCode((prev & 0x0f) << 4 | cur >> 2));
+ break;
+ case 3:
+ result.push(String.fromCharCode((prev & 3) << 6 | cur));
+ break;
+ }
+ prev = cur;
+ i++;
+ }
+ return result.join("");
+ }
+}
+
+Handlebars.registerHelper("key_value", function(name, fn) {
+ var obj = this[name];
+ var buffer = "", key;
+ for (key in obj) {
+ if (obj.hasOwnProperty(key)) {
+ buffer += fn({key: key, value: obj[key]});
+ }
+ }
+ return buffer;
+});
+
App = Em.Application.create({
ready: function() {
var self = this;
@@ -244,6 +289,7 @@ App.VirtualHostController = Em.ArrayCont
},
onSelectedChange: function() {
App.set("destination", null)
+ App.MessagesController.clear();
this.refresh();
}.observes("selected")
});
@@ -332,6 +378,10 @@ App.DestinationsController = Ember. Arra
App.destination = null;
App.DestinationController = Em.Controller.create({
+
+ tabs:["Messages","Producers","Consumers"],
+ selected_tab:"Messages",
+
destinationBinding:"App.destination",
selectedBinding:"App.DestinationsController.selected",
clear: function() {
@@ -341,6 +391,7 @@ App.DestinationController = Em.Controlle
var selected = this.get("selected")
if( selected==null ) {
App.set('destination', null);
+ App.MessagesController.clear();
} else {
var virtual_host = App.DestinationsController.get("virtual_host");
var kind = App.DestinationsController.get("kind");
@@ -358,10 +409,82 @@ App.DestinationController = Em.Controlle
});
App.set('destination', data);
});
+ var max_body = 100;
+ App.ajax("GET", "/broker/virtual-hosts/"+virtual_host+"/"+kind+"/"+selected+"/messages?from=0&max=100&max_body="+max_body, function(data) {
+ App.MessagesController.clear();
+ data.forEach(function(item){
+ if( item.base64_body ) {
+ var rc = atob(item.base64_body);
+ if( item.body_truncated ) {
+ rc += "..."
+ }
+ item.body = rc;
+ }
+ if( item.expiration==0 ) {
+ item.expiration = "no"
+ } else {
+ item.expiration = new Date(connector.state_since);
+ }
+ App.MessagesController.addObject(Ember.Object.create(item));
+ });
+ });
}
}.observes("selected"),
});
+
+App.MessagesController = Ember. ArrayController.create({
+ content: [],
+
+ toggle_headers: function(event) {
+ event.context.set("show_headers", !event.context.get("show_headers"));
+ },
+ toggle_details: function(event) {
+ event.context.set("show_details", !event.context.get("show_details"));
+ },
+
+ body: "",
+
+ send: function() {
+// var virtual_host = this.get('virtual_host');
+// var kind = this.get('kind');
+// var create_name = this.get('create_name');
+ this.set('body', "");
+// App.ajax("PUT", "/broker/virtual-hosts/"+virtual_host+"/"+kind+"/"+create_name, function(data) {
+// App.DestinationsController.refresh();
+// });
+ },
+
+ all_checked:false,
+ check_all_toggle: function() {
+ var all_checked= this.get("all_checked");
+ this.get('content').forEach(function(item){
+ item.set('checked', all_checked);
+ });
+ }.observes("all_checked"),
+
+ remove: function() {
+ var virtual_host = this.get('virtual_host');
+ var kind = this.get('kind');
+ var content = this.get('content');
+ content.forEach(function(item){
+ var checked = item.get('checked');
+ if( checked ) {
+ var name = item.get(0);
+ App.ajax("DELETE", "/broker/virtual-hosts/"+virtual_host+"/"+kind+"/"+name, function(data) {
+ App.DestinationsController.refresh();
+ });
+ }
+ });
+ },
+
+ selected:null,
+ select: function(event) {
+ this.set("selected", event.context.get(0))
+ },
+
+});
+
Ember.View.create({
templateName: 'notifications',
}).appendTo("#notifications");
|