activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1434448 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protoc...
Date Wed, 16 Jan 2013 22:22:46 GMT
Author: chirino
Date: Wed Jan 16 22:22:46 2013
New Revision: 1434448

URL: http://svn.apache.org/viewvc?rev=1434448&view=rev
Log:
Add message browsing support to the REST interface.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java
      - copied, changed from r1432955, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala Wed Jan 16 22:22:46 2013
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.broker
 import org.fusesource.hawtbuf.Buffer
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.fusesource.hawtbuf.UTF8Buffer
-import org.apache.qpid.proton.amqp.{UnsignedLong, UnsignedInteger}
+import org.apache.qpid.proton.amqp.{UnsignedByte, UnsignedShort, UnsignedLong, UnsignedInteger}
 import org.apache.qpid.proton.amqp.messaging.{Properties, Header}
 import org.apache.qpid.proton.message.impl.MessageImpl
 
@@ -247,6 +247,104 @@ class AmqpMessage(private var encoded_bu
     rc
   }
 
+
+  override def headers_as_json: java.util.HashMap[String, Object] = {
+    val rc = new java.util.HashMap[String, Object]()
+    import collection.JavaConversions._
+
+    def convert(v: AnyRef) = (v match {
+      case v: UnsignedByte => new java.lang.Integer(v.shortValue())
+      case v: UnsignedShort => new java.lang.Integer(v.intValue())
+      case v: UnsignedInteger => new java.lang.Long(v.longValue())
+      case v: UnsignedLong => new java.lang.Long(v.longValue())
+      case _ => v
+    })
+
+    if ( decoded.getHeader!=null ) {
+      val header = decoded.getHeader
+      if ( header.getDeliveryCount !=null ) {
+        rc.put("header.delivery_count", new java.lang.Long(header.getDeliveryCount.longValue()))
+      }
+      if ( header.getDurable !=null ) {
+        rc.put("header.durable", new java.lang.Boolean(header.getDurable.booleanValue()))
+      }
+      if ( header.getFirstAcquirer !=null ) {
+        rc.put("header.first_acquirer", new java.lang.Boolean(header.getFirstAcquirer.booleanValue()))
+      }
+      if ( header.getPriority !=null ) {
+        rc.put("header.priority", new java.lang.Integer(header.getPriority.intValue()))
+      }
+      if ( header.getTtl !=null ) {
+        rc.put("header.ttl", new java.lang.Long(header.getTtl.longValue()))
+      }
+    }
+
+    if( decoded.getProperties != null ) {
+      val properties = decoded.getProperties
+      if ( properties.getAbsoluteExpiryTime !=null ) {
+        rc.put("property.absolute_expiry_time", new java.lang.Long(properties.getAbsoluteExpiryTime.getTime()))
+      }
+      if ( properties.getContentEncoding !=null ) {
+        rc.put("property.content_encoding", properties.getContentEncoding.toString)
+      }
+      if ( properties.getContentType !=null ) {
+        rc.put("property.content_type", properties.getContentType.toString)
+      }
+      if ( properties.getCorrelationId !=null ) {
+        rc.put("property.correlation_id", properties.getCorrelationId.toString)
+      }
+      if ( properties.getCreationTime !=null ) {
+        rc.put("property.creation_time",  new java.lang.Long(properties.getCreationTime.getTime))
+      }
+      if ( properties.getGroupId !=null ) {
+        rc.put("property.group_id", properties.getGroupId)
+      }
+      if ( properties.getGroupSequence !=null ) {
+        rc.put("property.group_sequence", new java.lang.Long(properties.getGroupSequence.longValue()))
+      }
+      if ( properties.getMessageId !=null ) {
+        rc.put("property.message_id", properties.getMessageId)
+      }
+      if ( properties.getReplyTo !=null ) {
+        rc.put("property.reply_to", properties.getReplyTo)
+      }
+      if ( properties.getReplyToGroupId !=null ) {
+        rc.put("property.reply_to_group_id", properties.getReplyToGroupId)
+      }
+      if ( properties.getSubject !=null ) {
+        rc.put("property.subject", properties.getSubject)
+      }
+      if ( properties.getTo !=null ) {
+        rc.put("property.to", properties.getTo)
+      }
+      if ( properties.getUserId !=null ) {
+        rc.put("property.user_id", properties.getUserId.toString)
+      }
+    }
+
+    if( decoded.getDeliveryAnnotations !=null ) {
+      val annotations = decoded.getDeliveryAnnotations
+      for( (k,v:AnyRef) <- annotations.getValue ) {
+        rc.put("annotation."+k, convert(v))
+      }
+    }
+
+    if( decoded.getApplicationProperties !=null ) {
+      val properties = decoded.getApplicationProperties
+      for( (k,v:AnyRef) <- properties.getValue ) {
+        rc.put("app."+k, convert(v))
+      }
+    }
+
+    if( decoded.getFooter !=null ) {
+      val footer = decoded.getFooter
+      for( (k,v:AnyRef) <- footer.getValue ) {
+        rc.put("footer."+k, convert(v))
+      }
+    }
+    rc
+  }
+
   def release() {}
   def retain() {}
   def retained(): Int = 0

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jan 16 22:22:46 2013
@@ -120,6 +120,8 @@ trait Message extends Filterable with Re
    */
   def codec:MessageCodec
 
+  def headers_as_json = new java.util.HashMap[String, Object]()
+
   def encoded:Buffer = codec.encode(this).buffer
 
   def message_group: String = null

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jan 16 22:22:46 2013
@@ -332,6 +332,41 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
+  def browse(from_seq:Long, to:Option[Long], max:Long)(func: (Array[(EntryStatusDTO, Delivery)])=>Unit):Unit = {
+    var result = ListBuffer[(EntryStatusDTO, Delivery)]()
+    def load_from(start:Long):Unit = {
+      assert_executing
+      var cur = head_entry.getNext
+      while(true) {
+        if( cur == null || result.size >= max || ( to.isDefined && cur.seq > to.get) ) {
+          func(result.toArray)
+          return
+        }
+        val next = cur.getNext
+        if ( cur.seq >= start ) {
+          cur.state match {
+            case state:QueueEntry#Loaded =>
+              result.append((create_entry_status(cur), state.delivery))
+            case state:QueueEntry#Swapped =>
+              state.swapped_in_watchers ::=(()=>{
+                load_from(cur.seq) // resume loading
+              })
+              cur.load(consumer_swapped_in)
+              return
+            case state:QueueEntry#SwappedRange =>
+              state.swapped_in_watchers ::=(()=>{
+                load_from(cur.seq)
+              })
+              cur.load(consumer_swapped_in)
+              return
+          }
+        }
+        cur = next
+      }
+    }
+    load_from(from_seq)
+  }
+
   def status(entries:Boolean=false, include_producers:Boolean=false, include_consumers:Boolean=false) = {
     val rc = new QueueStatusDTO
     rc.id = this.id
@@ -353,17 +388,7 @@ class Queue(val router: LocalRouter, val
     if( entries ) {
       var cur = this.head_entry
       while( cur!=null ) {
-
-        val e = new EntryStatusDTO
-        e.seq = cur.seq
-        e.count = cur.count
-        e.size = cur.size
-        e.consumer_count = cur.parked.size
-        e.is_prefetched = cur.prefetched
-        e.state = cur.label
-
-        rc.entries.add(e)
-
+        rc.entries.add(create_entry_status(cur))
         cur = if( cur == this.tail_entry ) {
           null
         } else {
@@ -397,41 +422,7 @@ class Queue(val router: LocalRouter, val
 
     if( include_consumers ) {
       for( sub <- this.all_subscriptions.values ) {
-        val link = new QueueConsumerLinkDTO
-        sub.consumer.connection match {
-          case Some(connection) =>
-            link.kind = "connection"
-            link.id = connection.id.toString
-            link.label = connection.transport.getRemoteAddress.toString
-          case _ =>
-            link.kind = "unknown"
-            link.label = "unknown"
-        }
-        link.position = sub.pos.seq
-        link.enqueue_item_counter = sub.session.enqueue_item_counter
-        link.enqueue_size_counter = sub.session.enqueue_size_counter
-        link.enqueue_ts = sub.session.enqueue_ts
-        link.total_ack_count = sub.total_ack_count
-        link.total_nack_count = sub.total_nack_count
-        link.acquired_size = sub.acquired_size
-        link.acquired_count = sub.acquired_count
-        sub.ack_rates match {
-          case Some((items_per_sec, size_per_sec) ) =>
-            link.ack_item_rate = items_per_sec
-            link.ack_size_rate = size_per_sec
-          case _ =>
-        }
-
-        link.waiting_on = if( sub.full ) {
-          "consumer"
-        } else if( sub.pos.is_tail ) {
-          "producer"
-        } else if( !sub.pos.is_loaded ) {
-          "load"
-        } else {
-          "dispatch"
-        }
-        rc.consumers.add(link)
+        rc.consumers.add(sub.create_link_dto())
       }
     } else {
       rc.consumers = null
@@ -439,6 +430,21 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
+
+  def create_entry_status(cur: QueueEntry): EntryStatusDTO = {
+    val rc = new EntryStatusDTO
+    rc.seq = cur.seq
+    rc.count = cur.count
+    rc.size = cur.size
+    rc.consumer_count = cur.parked.size
+    rc.is_prefetched = cur.prefetched
+    rc.state = cur.label
+    if( cur.acquiring_subscription != null ) {
+      rc.acquirer = cur.acquiring_subscription.create_link_dto(false)
+    }
+    rc
+  }
+
   def update(on_completed:Task) = dispatch_queue {
 
     val prev_persistent = tune_persistent

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Jan 16 22:22:46 2013
@@ -189,6 +189,7 @@ class QueueEntry(val queue:Queue, val se
   def messageKey = state.message_key
   def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
   def is_acquired = state.is_acquired
+  def acquiring_subscription = state.acquiring_subscription
   def dispatch() = state.dispatch
   def memory_space = state.memory_space
 
@@ -296,7 +297,8 @@ class QueueEntry(val queue:Queue, val se
     /**
      * Is the entry acquired by a subscription.
      */
-    def is_acquired = false
+    def is_acquired = acquiring_subscription!=null
+    def acquiring_subscription:Subscription = null
 
     /**
      * @returns true if the entry is either swapped or swapping.
@@ -399,7 +401,7 @@ class QueueEntry(val queue:Queue, val se
     assert( delivery!=null, "delivery cannot be null")
 
     var acquirer:Subscription = _
-    override def is_acquired = acquirer!=null
+    override def acquiring_subscription = acquirer
 
     override def memory_space = space
 
@@ -723,14 +725,13 @@ class QueueEntry(val queue:Queue, val se
     override def redelivery_count = _redeliveries
     override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
 
+    override def acquiring_subscription = acquirer
     override def count = 1
 
     override def as_swapped = this
 
     override def is_swapped_or_swapping_out = true
 
-    override def is_acquired = acquirer!=null
-
     override def memory_space = space
 
     def label = {
@@ -773,7 +774,6 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-
     def to_delivery = {
       val delivery = new Delivery()
       delivery.seq = seq
@@ -787,6 +787,13 @@ class QueueEntry(val queue:Queue, val se
       delivery
     }
 
+    var swapped_in_watchers = List[()=>Unit]()
+    def fire_swapped_in_watchers = {
+      val watchers = swapped_in_watchers
+      swapped_in_watchers = Nil
+      watchers.foreach(_())
+    }
+
     def swapped_in(messageRecord:MessageRecord) = {
       if( space!=null ) {
 //        debug("Loaded message seq: ", seq )
@@ -803,9 +810,8 @@ class QueueEntry(val queue:Queue, val se
         queue.individual_swapped_items -= 1
         state = new Loaded(delivery, true, space)
         space = null
-      } else {
-//        debug("Ignoring store load of: ", messageKey)
       }
+      fire_swapped_in_watchers
     }
 
 
@@ -927,6 +933,13 @@ class QueueEntry(val queue:Queue, val se
     }
     override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+", size: "+size+"}" }
 
+    var swapped_in_watchers = List[()=>Unit]()
+    def fire_swapped_in_watchers = {
+      val watchers = swapped_in_watchers
+      swapped_in_watchers = Nil
+      watchers.foreach(_())
+    }
+
     override def swap_in(space:MemorySpace):Unit = {
       if( !loading ) {
         loading = true
@@ -972,8 +985,8 @@ class QueueEntry(val queue:Queue, val se
           parked.foreach(_.advance(next))
           next :::= parked
           queue.trigger_swap
-
           unlink
+          fire_swapped_in_watchers
         }
 
         def attempt_load(attempt_counter:Int):Unit = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jan 16 22:22:46 2013
@@ -21,6 +21,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
+import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -49,6 +50,45 @@ class Subscription(val queue:Queue, val 
 
   var enqueue_size_per_interval = new CircularBuffer[Int](15)
 
+  def create_link_dto(include_metrics:Boolean=true) = {
+    val link = new QueueConsumerLinkDTO
+    consumer.connection match {
+      case Some(connection) =>
+        link.kind = "connection"
+        link.id = connection.id.toString
+        link.label = connection.transport.getRemoteAddress.toString
+      case _ =>
+        link.kind = "unknown"
+        link.label = "unknown"
+    }
+    if ( include_metrics ) {
+      link.position = pos.seq
+      link.enqueue_item_counter = session.enqueue_item_counter
+      link.enqueue_size_counter = session.enqueue_size_counter
+      link.enqueue_ts = session.enqueue_ts
+      link.total_ack_count = total_ack_count
+      link.total_nack_count = total_nack_count
+      link.acquired_size = acquired_size
+      link.acquired_count = acquired_count
+      ack_rates match {
+        case Some((items_per_sec, size_per_sec) ) =>
+          link.ack_item_rate = items_per_sec
+          link.ack_size_rate = size_per_sec
+        case _ =>
+      }
+      link.waiting_on = if( full ) {
+        "consumer"
+      } else if( pos.is_tail ) {
+        "producer"
+      } else if( !pos.is_loaded ) {
+        "load"
+      } else {
+        "dispatch"
+      }
+    }
+    link
+  }
+
   def avg_enqueue_size_per_interval = {
     var rc = 0
     if( enqueue_size_per_interval.size > 0 ) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Jan 16 22:22:46 2013
@@ -22,7 +22,6 @@ import org.apache.activemq.apollo.dto._
 import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch._
 import collection.mutable.{HashSet, HashMap, ListBuffer}
-import java.lang.Long
 import security.SecuredResource
 
 /**
@@ -323,6 +322,19 @@ class Topic(val router:LocalRouter, val 
     }
   }
 
+  def browse(from_seq:Long, max:Long)(func: (Array[(EntryStatusDTO, Delivery)])=>Unit):Unit = {
+    val msg = retained_message
+    if ( msg==null ) {
+      func(Array())
+    } else {
+      val status = new EntryStatusDTO()
+      status.seq = retained_message.seq
+      status.size = retained_message.size
+      status.state = "loaded"
+      status.is_prefetched = true;
+      func(Array((status, retained_message)))
+    }
+  }
 
   def update(on_completed:Task) = {
     refresh_config

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Wed Jan 16 22:22:46 2013
@@ -121,6 +121,8 @@ trait DecodedUdpMessage {
 abstract class UdpProtocolHandler extends ProtocolHandler {
   import UdpProtocolHandler._
   type ConfigTypeDTO <: ProtocolDTO
+  def configClass:Class[ConfigTypeDTO]
+
   def protocol = "udp"
   var session_id:Option[String] = None
 
@@ -148,10 +150,13 @@ abstract class UdpProtocolHandler extend
 
     configure(connection.connector.config match {
       case connector_config:AcceptingConnectorDTO =>
-        connector_config.protocols.flatMap{ _ match {
-          case x:ConfigTypeDTO => Some(x)
-          case _ => None
-        }}.headOption
+        connector_config.protocols.flatMap{ x=>
+          if( x.getClass == configClass) {
+            Some(x.asInstanceOf[ConfigTypeDTO])
+          } else {
+            None
+          }
+        }.headOption
       case _ => None
     })
   }
@@ -290,6 +295,7 @@ class UdpProtocol extends BaseProtocol {
 
   def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
     type ConfigTypeDTO = UdpDTO
+    def configClass = classOf[ConfigTypeDTO]
 
     var default_host:VirtualHost = _
     var topic_address:AsciiBuffer = _

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java Wed Jan 16 22:22:46 2013
@@ -20,10 +20,7 @@ package org.apache.activemq.apollo.dto;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
 
 /**
  * <p>
@@ -57,4 +54,6 @@ public class EntryStatusDTO {
     @XmlAttribute(name="is_prefetched")
     public boolean is_prefetched;
 
+    @XmlElement
+    public QueueConsumerLinkDTO acquirer;
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java (from r1432955, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java&r1=1432955&r2=1434448&rev=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/MessageStatusDTO.java Wed Jan 16 22:22:46 2013
@@ -16,32 +16,57 @@
  */
 package org.apache.activemq.apollo.dto;
 
+
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 
 import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="queue_consumer_link")
-@XmlAccessorType(XmlAccessType.FIELD)
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class QueueConsumerLinkDTO extends LinkDTO {
+public class MessageStatusDTO {
 
-    public long position = 0;
+    /**
+     * Additional
+     */
+    public EntryStatusDTO entry;
 
-    public int acquired_count;
-    public long acquired_size;
+    /**
+     * When the message will expire
+     */
+    public long expiration;
 
-    public long total_ack_count;
-    public long total_nack_count;
+    /**
+     * Is the delivery persistent?
+     */
+    public boolean persistent = false;
 
-    public Double ack_item_rate;
-    public Double ack_size_rate;
+    /**
+     * The encoding that the message is stored in.
+     */
+    public String codec;
+
+    /**
+     * A map of all the headers in the mesasge.
+     */
+    public HashMap<String, Object> headers = new HashMap<String, Object>();
 
     /**
-     * What the consumer is currently waiting on
+     * The body of the message in base 64 encoding.
      */
-    @XmlAttribute(name="waiting_on")
-	public String waiting_on;
+    public String base64_body;
+
+    /**
+     * Has the body been truncated.
+     */
+    public boolean body_truncated;
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerLinkDTO.java Wed Jan 16 22:22:46 2013
@@ -28,13 +28,13 @@ import javax.xml.bind.annotation.*;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class QueueConsumerLinkDTO extends LinkDTO {
 
-    public long position = 0;
+    public Long position;
 
-    public int acquired_count;
-    public long acquired_size;
+    public Integer acquired_count;
+    public Long acquired_size;
 
-    public long total_ack_count;
-    public long total_nack_count;
+    public Long total_ack_count;
+    public Long total_nack_count;
 
     public Double ack_item_rate;
     public Double ack_size_rate;

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Wed Jan 16 22:22:46 2013
@@ -72,6 +72,37 @@ class OpenwireMessage(val message:Active
     }
   }
 
+
+  override def headers_as_json: java.util.HashMap[String, Object] = {
+    val rc = new java.util.HashMap[String, Object]()
+    import collection.JavaConversions._
+
+    def fillin(name:String) = {
+      val t = getProperty(name)
+      if ( t !=null ) {
+        rc.put(name, t.asInstanceOf[Object])
+      }
+    }
+
+    fillin("JMSDeliveryMode")
+    fillin("JMSPriority")
+    fillin("JMSType")
+    fillin("JMSMessageID")
+    fillin("JMSDestination")
+    fillin("JMSReplyTo")
+    fillin("JMSCorrelationID")
+    fillin("JMSExpiration")
+    fillin("JMSXDeliveryCount")
+    fillin("JMSXUserID")
+    fillin("JMSXGroupID")
+    fillin("JMSXGroupSeq")
+
+    for( (x,y) <- message.getProperties ) {
+      rc.put(x,y)
+    }
+    rc
+  }
+
   def getLocalConnectionId = message.getProducerId.getConnectionId
 
   def codec = OpenwireMessageCodec

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jan 16 22:22:46 2013
@@ -149,6 +149,14 @@ case class StompFrameMessage(frame:Stomp
   }
 
 
+  override def headers_as_json: java.util.HashMap[String, Object] = {
+    val rc = new java.util.HashMap[String, Object]
+    for( (k,v)<-headerIndex ) {
+      rc.put(k.toString, v.toString)
+    }
+    rc
+  }
+
   def setDisposer(disposer: Runnable) = throw new UnsupportedOperationException
   def retained = throw new UnsupportedOperationException
   def retain = frame.retain

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala Wed Jan 16 22:22:46 2013
@@ -36,6 +36,7 @@ class StompUdpProtocol extends UdpProtoc
   override def createProtocolHandler = new UdpProtocolHandler {
 
     type ConfigTypeDTO = StompDTO
+    def configClass = classOf[ConfigTypeDTO]
 
     var config:ConfigTypeDTO = _
     var destination_parser = Stomp.destination_parser

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Wed Jan 16 22:22:46 2013
@@ -36,7 +36,10 @@ import javax.ws.rs.core.MediaType._
 import javax.servlet.http.HttpServletResponse
 import FutureResult._
 import com.wordnik.swagger.annotations.{ApiOperation, Api}
-import language.implicitConversions;
+import language.implicitConversions
+import org.fusesource.hawtbuf.Buffer
+import org.apache.commons.codec.binary.Base64
+;
 
 @Path(          "/api/json/broker")
 @Api(value =    "/api/json/broker",
@@ -572,20 +575,43 @@ class BrokerResource() extends Resource 
   }
 
   @GET @Path("/virtual-hosts/{id}/topic-queues/{name:.*}/{qid}")
-  @ApiOperation(value = "Gets the status of a topic consumer queue.")
-  def topic(@PathParam("id") id : String,@PathParam("name") name : String,  @PathParam("qid") qid : Long,
-            @QueryParam("entries") entries:Boolean,
-            @QueryParam("producers") producers:Boolean,
-            @QueryParam("consumers") consumers:Boolean):QueueStatusDTO = {
+    @ApiOperation(value = "Gets the status of a topic consumer queue.")
+    def topic(@PathParam("id") id : String,@PathParam("name") name : String,  @PathParam("qid") qid : Long,
+              @QueryParam("entries") entries:Boolean,
+              @QueryParam("producers") producers:Boolean,
+              @QueryParam("consumers") consumers:Boolean):QueueStatusDTO = {
+      with_virtual_host(id) { host =>
+        val router:LocalRouter = host
+        val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+        val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
+        monitoring(node) {
+          sync(queue) {
+            queue.status(entries, producers, consumers)
+          }
+        }
+      }
+    }
+
+
+  @GET @Path("/virtual-hosts/{id}/topics/{name:.*}/messages")
+  @ApiOperation(value = "Gets a list of recent messages sent to the topic.")
+  def topic_messages(@PathParam("id") id : String, @PathParam("name") name : String,
+                     @QueryParam("from") _from:java.lang.Long,
+                     @QueryParam("max") _max:java.lang.Long,
+                     @QueryParam("max_body") _max_body:java.lang.Integer):Array[MessageStatusDTO] = {
+    var from = OptionSupport(_from).getOrElse(0L)
+    var max = OptionSupport(_max).getOrElse(100L)
+    var max_body = OptionSupport(_max_body).getOrElse(0)
     with_virtual_host(id) { host =>
-      val router:LocalRouter = host
+      val rc = FutureResult[Array[MessageStatusDTO]]()
+      val router: LocalRouter = host
       val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
-      val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
       monitoring(node) {
-        sync(queue) {
-          queue.status(entries, producers, consumers)
+        node.browse(from, max) { deliveries =>
+          rc.set(Success(deliveries.map(message_convert(max_body, _))))
         }
       }
+      rc
     }
   }
 
@@ -621,6 +647,30 @@ class BrokerResource() extends Resource 
       }
     }
   }
+  @GET @Path("/virtual-hosts/{id}/queues/{name:.*}/messages")
+  @ApiOperation(value = "Gets a list of messages that exist on the queue.")
+  def queue_messagesx(@PathParam("id") id : String, @PathParam("name") name : String,
+                      @QueryParam("from") _from:java.lang.Long,
+                      @QueryParam("to") _to:java.lang.Long,
+                      @QueryParam("max") _max:java.lang.Long,
+                      @QueryParam("max_body") _max_body:java.lang.Integer):Array[MessageStatusDTO] = {
+    var from = OptionSupport(_from).getOrElse(0L)
+    var to = OptionSupport(_to)
+    var max = OptionSupport(_max).getOrElse(100L)
+    var max_body = OptionSupport(_max_body).getOrElse(0)
+    with_virtual_host(id) { host =>
+      val rc = FutureResult[Array[MessageStatusDTO]]()
+      val router: LocalRouter = host
+      val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      monitoring(node) {
+        node.browse(from, to, max) { deliveries =>
+          rc.set(Success(deliveries.map(message_convert(max_body, _))))
+        }
+      }
+      rc
+    }
+  }
+
 
   @DELETE @Path("/virtual-hosts/{id}/queues/{name:.*}")
   @Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
@@ -663,6 +713,28 @@ class BrokerResource() extends Resource 
     }
   }
 
+  private def base64(buffer:Buffer) =
+    new String(Base64.encodeBase64(buffer.toByteArray), "UTF-8");
+
+  private def message_convert(max_body:Int, value:(EntryStatusDTO, Delivery)): MessageStatusDTO = {
+    val (entry, delivery) = value
+    val rc = new MessageStatusDTO
+    rc.codec = delivery.message.codec.id()
+    rc.headers = delivery.message.headers_as_json;
+    rc.expiration = delivery.expiration
+    rc.entry = entry
+
+    if( max_body > 0 ) {
+      val body = delivery.message.getBodyAs(classOf[Buffer])
+      if( body.length > max_body) {
+        body.length = max_body
+        rc.body_truncated = true
+      }
+      rc.base64_body = base64(body)
+    }
+    rc
+  }
+
   @GET @Path("/virtual-hosts/{id}/dsubs")
   @ApiOperation(value = "Gets a list of all the durable subscriptions that exist on the broker.")
   @Produces(Array(APPLICATION_JSON))
@@ -720,6 +792,30 @@ class BrokerResource() extends Resource 
     }
   }
 
+  @GET @Path("/virtual-hosts/{id}/dsub/{name:.*}/messages")
+  @ApiOperation(value = "Gets a list of the messages that exist on the durable sub.")
+  def dsub_messages(@PathParam("id") id : String, @PathParam("name") name : String,
+             @QueryParam("from") _from:java.lang.Long,
+             @QueryParam("to") _to:java.lang.Long,
+             @QueryParam("max") _max:java.lang.Long,
+             @QueryParam("max_body") _max_body:java.lang.Integer):Array[MessageStatusDTO] = {
+    var from = OptionSupport(_from).getOrElse(0L)
+    var to = OptionSupport(_to)
+    var max = OptionSupport(_max).getOrElse(100L)
+    var max_body = OptionSupport(_max_body).getOrElse(0)
+
+    with_virtual_host(id) { host =>
+      val rc = FutureResult[Array[MessageStatusDTO]]()
+      val router: LocalRouter = host
+      val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      monitoring(node) {
+        node.browse(from, to, max) { deliveries =>
+          rc.set(Success(deliveries.map(message_convert(max_body, _))))
+        }
+      }
+      rc
+    }
+  }
 
   private def decode_path(name:String) = {
     try {

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/css/app.css Wed Jan 16 22:22:46 2013
@@ -57,6 +57,10 @@ div.application {
   float:right;
 }
 
+.nowrap {
+  white-space:nowrap;
+}
+
 @media (max-width: 767px) {
   #footer {
     margin-right: -20px;

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/index.html Wed Jan 16 22:22:46 2013
@@ -1,5 +1,5 @@
 <!DOCTYPE html>
-<html lang="en">
+<html lang="en" xmlns="http://www.w3.org/1999/html">
 <head>
   <meta charset="UTF-8">
   <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
@@ -200,67 +200,151 @@
                     </table>
                   </div>
                 </div>
-                <div class="span8">
-                  {{#if App.destination.producers}}
-                  <div>
-                      <h4>Producers</h4>
-                      <table class="details table table-bordered table-striped" style="">
-                        <tbody>
-                        <tr>
-                          <th>Name</th>
-                          <th>Kind</th>
-                          <th>Items</th>
-                          <th>Size</th>
-                          <th>Last Enqueue</th>
-                        </tr>
-                        {{#each App.destination.producers}}
-                        <tr>
-                          <td>{{label}}</td>
-                          <td>{{kind}}</td>
-                          <td>{{enqueue_item_counter}}</td>
-                          <td>{{enqueue_size_counter}}</td>
-                          <td>{{enqueue_date}}</td>
-                        </tr>
-                        {{/each}}
-                        </tbody>
-                      </table>
-                  </div>
-                  {{/if}}
-                  {{#if App.destination.consumers}}
-                  <div>
-                      <h4>Consumers</h4>
-                      <table class="details table table-bordered table-striped" style="">
-                        <tbody>
-                        <tr>
-                          <th>Name</th>
-                          <th>Kind</th>
-                          <th>Items</th>
-                          <th>Size</th>
-                          <th>Last Enqueue</th>
-                            <th>Acks</th>
-                            <th>Nacks</th>
+
+                <div class="span7">
+                  <ul class="nav nav-tabs">
+                    <li class="active"><a href="#TAB_Messages" data-toggle="tab">Messages ({{App.destination.metrics.queue_items}})</a></li>
+                    <li><a href="#TAB_Producers" data-toggle="tab">Producers ({{App.destination.producers.length}})</a></li>
+                    <li><a href="#TAB_Consumers" data-toggle="tab">Consumers ({{App.destination.producers.length}})</a></li>
+                  </ul>
+                  <div class="tabbable">
+                    <div class="tab-content">
+                      <div class="tab-pane active" id="TAB_Messages">
+
+                        <div class="well form-horizontal">
+                          <label class="control-label"><strong>Message: </strong></label>
+                          <div class="controls">
+                            {{view Ember.TextField class="input-xlarge" placeholder="body" valueBinding="App.MessagesController.send_body"}}
+                            <a class="btn" href="#" {{action "send" target="App.MessagesController" on="click"}}>Send</a>
+                          </div>
+                        </div>
+
+                        <table class="details table table-bordered" style="">
+                          <tbody>
+                          <tr>
+                            <th>&nbsp;</th>
+                            <th>Sequence</th>
+                            <th>Codec</th>
+                            <th>Persistent</th>
+                            <th>Expires</th>
+                            <th>Prefetched</th>
+                            <th>Acquirer</th>
+                            <th width="100%">Size</th>
+                          </tr>
+                          {{#each App.MessagesController}}
+                          <tr>
+                            <td>
+                              <a {{action "toggle_details" . target="App.MessagesController" on="click"}}>
+                              {{#if show_details}}
+                              <i class="icon-caret-down"></i>
+                              {{else}}
+                              <i class="icon-caret-right"></i>
+                              {{/if}}
+                              </a>
+                            </td>
+                            <td>{{entry.seq}}</td>
+                            <td>{{codec}}</td>
+                            <td>{{persistent}}</td>
+                            <td>{{expiration}}</td>
+                            <td>{{entry.is_prefetched}}</td>
+                            <td>{{entry.acquirer}}</td>
+                            <td>{{entry.size}}</td>
+                          </tr>
+                          {{#if show_details}}
+                          <tr>
+                            <td colspan="9">
+                            <!-- body -->
+                            <div>
+
+                              {{#if show_headers}}
+                              <h4><a {{action "toggle_headers" . target="App.MessagesController" on="click"}}><i class="icon-caret-down"></i> Headers</a></h4>
+                              <table class="details table table-bordered table-striped" style="">
+                                <tbody>
+                                {{#key_value headers}}
+                                <tr>
+                                  <th class="nowrap">{{key}}</th>
+                                  <td width="100%">{{value}}</td>
+                                </tr>
+                                {{/key_value}}
+                                </tbody>
+                              </table>
+                              {{else}}
+                              <h4><a {{action "toggle_headers" . target="App.MessagesController" on="click"}}><i class="icon-caret-right"></i> Headers</a></h4>
+                              {{/if}}
+                              <h4>Body</h4>
+                              <div>
+                                <pre>{{body}}</pre>
+                              </div>
+                            </div>
+                            </td>
+                          </tr>
+                          {{/if}}
+                          {{/each}}
+                          </tbody>
+                        </table>
+
+                        <div style="padding:.5em 0">
+                          <button class="btn btn-small" {{action "remove" target="App.MessagesController" on="click"}}>Delete</button>
+                        </div>
+                      </div>
+                      <div class="tab-pane" id="TAB_Producers">
+                        <table class="details table table-bordered table-striped" style="">
+                          <tbody>
+                          <tr>
+                            <th>Name</th>
+                            <th>Kind</th>
+                            <th>Items</th>
+                            <th>Size</th>
                             <th>Last Enqueue</th>
-                            <th>Ack Rate</th>
-                            <th>Status</th>
-                        </tr>
-                        {{#each App.destination.consumers}}
-                        <tr>
-                          <td>{{label}}</td>
-                          <td>{{kind}}</td>
-                          <td>{{enqueue_item_counter}}</td>
-                          <td>{{enqueue_size_counter}}</td>
-                          <td>{{enqueue_date}}</td>
-                          <td>{{total_ack_count}}</td>
-                          <td>{{total_nack_count}}</td>
-                          <td>{{ack_item_rate}} msgs/sec {{ack_size_rate}} bytes/sec</td>
-                          <td>{{waiting_on}}</td>
-                        </tr>
-                        {{/each}}
-                        </tbody>
-                      </table>
+                          </tr>
+                          {{#each App.destination.producers}}
+                          <tr>
+                            <td>{{label}}</td>
+                            <td>{{kind}}</td>
+                            <td>{{enqueue_item_counter}}</td>
+                            <td>{{enqueue_size_counter}}</td>
+                            <td>{{enqueue_date}}</td>
+                          </tr>
+                          {{/each}}
+                          </tbody>
+                        </table>
+                      </div>
+                      <div class="tab-pane" id="TAB_Consumers">
+                        <table class="details table table-bordered table-striped" style="">
+                          <tbody>
+                          <tr>
+                            <th>Name</th>
+                            <th>Kind</th>
+                            <th>Items</th>
+                            <th>Size</th>
+                            <th>Last Enqueue</th>
+                              <th>Acks</th>
+                              <th>Nacks</th>
+                              <th>Last Enqueue</th>
+                              <th>Ack Rate</th>
+                              <th>Status</th>
+                          </tr>
+                          {{#each App.destination.consumers}}
+                          <tr>
+                            <td>{{label}}</td>
+                            <td>{{kind}}</td>
+                            <td>{{enqueue_item_counter}}</td>
+                            <td>{{enqueue_size_counter}}</td>
+                            <td>{{enqueue_date}}</td>
+                            <td>{{total_ack_count}}</td>
+                            <td>{{total_nack_count}}</td>
+                            <td>{{ack_item_rate}} msgs/sec {{ack_size_rate}} bytes/sec</td>
+                            <td>{{waiting_on}}</td>
+                          </tr>
+                          {{/each}}
+                          </tbody>
+                        </table>
+                      </div>
+                    </div>
                   </div>
-                  {{/if}}
                 </div>
+
+
             </div>
             {{/if}}
           </div>

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js?rev=1434448&r1=1434447&r2=1434448&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js Wed Jan 16 22:22:46 2013
@@ -1,3 +1,48 @@
+
+// Install a base64 decoding function if one is not available.
+if( !window.atob ) {
+  window.atob = function(text){
+    text = text.replace(/\s/g,"");
+    if(!(/^[a-z0-9\+\/\s]+\={0,2}$/i.test(text)) || text.length % 4 > 0){
+      throw new Error("Not a base64-encoded string.");
+    }
+    var digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/",
+      cur, prev, digitNum,
+      i=0,
+      result = [];
+    text = text.replace(/=/g, "");
+    while(i < text.length){
+      cur = digits.indexOf(text.charAt(i));
+      digitNum = i % 4;
+      switch(digitNum){
+        case 1:
+          result.push(String.fromCharCode(prev << 2 | cur >> 4));
+          break;
+        case 2:
+          result.push(String.fromCharCode((prev & 0x0f) << 4 | cur >> 2));
+          break;
+        case 3:
+          result.push(String.fromCharCode((prev & 3) << 6 | cur));
+          break;
+      }
+      prev = cur;
+      i++;
+    }
+    return result.join("");
+  }
+}
+
+Handlebars.registerHelper("key_value", function(name, fn) {
+  var obj = this[name];
+  var buffer = "", key;
+  for (key in obj) {
+    if (obj.hasOwnProperty(key)) {
+      buffer += fn({key: key, value: obj[key]});
+    }
+  }
+  return buffer;
+});
+
 App = Em.Application.create({
   ready: function() {
     var self = this;
@@ -244,6 +289,7 @@ App.VirtualHostController = Em.ArrayCont
   },
   onSelectedChange: function() {
     App.set("destination", null)
+    App.MessagesController.clear();
     this.refresh();
   }.observes("selected")
 });
@@ -332,6 +378,10 @@ App.DestinationsController = Ember. Arra
 
 App.destination = null;
 App.DestinationController = Em.Controller.create({
+
+  tabs:["Messages","Producers","Consumers"],
+  selected_tab:"Messages",
+
   destinationBinding:"App.destination",
   selectedBinding:"App.DestinationsController.selected",
   clear: function() {
@@ -341,6 +391,7 @@ App.DestinationController = Em.Controlle
     var selected = this.get("selected")
     if( selected==null ) {
       App.set('destination', null);
+      App.MessagesController.clear();
     } else {
       var virtual_host = App.DestinationsController.get("virtual_host");
       var kind = App.DestinationsController.get("kind");
@@ -358,10 +409,82 @@ App.DestinationController = Em.Controlle
         });
         App.set('destination', data);
       });
+      var max_body = 100;
+      App.ajax("GET", "/broker/virtual-hosts/"+virtual_host+"/"+kind+"/"+selected+"/messages?from=0&max=100&max_body="+max_body, function(data) {
+        App.MessagesController.clear();
+        data.forEach(function(item){
+          if( item.base64_body ) {
+            var rc = atob(item.base64_body);
+            if( item.body_truncated ) {
+              rc += "..."
+            }
+            item.body = rc;
+          }
+          if( item.expiration==0 ) {
+            item.expiration = "no"
+          } else {
+            item.expiration = new Date(connector.state_since);
+          }
+          App.MessagesController.addObject(Ember.Object.create(item));
+        });
+      });
     }
   }.observes("selected"),
 });
 
+
+App.MessagesController = Ember. ArrayController.create({
+  content: [],
+
+  toggle_headers: function(event) {
+    event.context.set("show_headers", !event.context.get("show_headers"));
+  },
+  toggle_details: function(event) {
+    event.context.set("show_details", !event.context.get("show_details"));
+  },
+
+  body: "",
+
+  send: function() {
+//    var virtual_host = this.get('virtual_host');
+//    var kind = this.get('kind');
+//    var create_name = this.get('create_name');
+    this.set('body', "");
+//    App.ajax("PUT", "/broker/virtual-hosts/"+virtual_host+"/"+kind+"/"+create_name, function(data) {
+//      App.DestinationsController.refresh();
+//    });
+  },
+
+  all_checked:false,
+  check_all_toggle: function() {
+    var all_checked= this.get("all_checked");
+    this.get('content').forEach(function(item){
+      item.set('checked', all_checked);
+    });
+  }.observes("all_checked"),
+
+  remove: function() {
+    var virtual_host = this.get('virtual_host');
+    var kind = this.get('kind');
+    var content = this.get('content');
+    content.forEach(function(item){
+      var checked = item.get('checked');
+      if( checked ) {
+        var name = item.get(0);
+        App.ajax("DELETE", "/broker/virtual-hosts/"+virtual_host+"/"+kind+"/"+name, function(data) {
+          App.DestinationsController.refresh();
+        });
+      }
+    });
+  },
+
+  selected:null,
+  select: function(event) {
+    this.set("selected", event.context.get(0))
+  },
+
+});
+
 Ember.View.create({
   templateName: 'notifications',
 }).appendTo("#notifications");



Mime
View raw message