activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1162079 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/...
Date Fri, 26 Aug 2011 12:22:24 GMT
Author: chirino
Date: Fri Aug 26 12:22:24 2011
New Revision: 1162079

URL: http://svn.apache.org/viewvc?rev=1162079&view=rev
Log:
Made DeliverySession extend SessionSink[Delivery] and added helper filter.  REST api now properly
reports the enqueue timestamp for consumers and producers.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
Fri Aug 26 12:22:24 2011
@@ -343,7 +343,7 @@ class Broker() extends BaseService {
 
   }
 
-  def schedule_periodic_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
+  def schedule_periodic_maintenance:Unit = dispatch_queue.after(100, TimeUnit.MILLISECONDS)
{
     if( service_state.is_starting_or_started ) {
       now = System.currentTimeMillis
       schedule_periodic_maintenance

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Aug 26 12:22:24 2011
@@ -75,18 +75,9 @@ trait DeliveryConsumer extends Retained 
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait DeliverySession extends Sink[Delivery] {
-  /**
-   * The number of deliveries accepted by this session.
-   */
-  def enqueue_item_counter:Long
-  /**
-   * The total size of the deliveries accepted by this session.
-   */
-  def enqueue_size_counter:Long
+trait DeliverySession extends SessionSink[Delivery] {
   def producer:DeliveryProducer
   def consumer:DeliveryConsumer
-  def remaining_capacity:Int
   def close:Unit
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug 26 12:22:24 2011
@@ -75,7 +75,9 @@ class Queue(val router: LocalRouter, val
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery)
+  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery)
{
+    override def time_stamp = now
+  }
 
   // sequence numbers.. used to track what's in the store.
   var message_seq_counter = 1L
@@ -294,6 +296,7 @@ class Queue(val router: LocalRouter, val
       }
       link.enqueue_item_counter = session.enqueue_item_counter
       link.enqueue_size_counter = session.enqueue_size_counter
+      link.enqueue_ts = session.enqueue_ts
       rc.producers.add(link)
     }
 
@@ -309,8 +312,9 @@ class Queue(val router: LocalRouter, val
           link.label = "unknown"
       }
       link.position = sub.pos.seq
-      link.enqueue_item_counter = sub.total_dispatched_count
-      link.enqueue_size_counter = sub.total_dispatched_size
+      link.enqueue_item_counter = sub.session.enqueue_item_counter
+      link.enqueue_size_counter = sub.session.enqueue_size_counter
+      link.enqueue_ts = sub.session.enqueue_ts
       link.total_ack_count = sub.total_ack_count
       link.total_nack_count = sub.total_nack_count
       link.acquired_size = sub.acquired_size
@@ -734,30 +738,22 @@ class Queue(val router: LocalRouter, val
 
   def is_persistent = tune_persistent
 
-  def connect(p: DeliveryProducer) = new DeliverySession {
+  class QueueDeliverySession(val producer: DeliveryProducer) extends DeliverySession with
SessionSinkFilter[Delivery]{
     retain
 
-
     override def toString = Queue.this.toString
-
     override def consumer = Queue.this
 
-    override def producer = p
-
     val session_max = producer.send_buffer_size
-    val session = session_manager.open(producer.dispatch_queue, session_max)
+    val downstream = session_manager.open(producer.dispatch_queue, session_max)
 
     dispatch_queue {
       inbound_sessions += this
       addCapacity( session_max )
     }
 
-    def remaining_capacity = session.remaining_capacity
-    def enqueue_item_counter = session.accepted_count
-    def enqueue_size_counter = session.accepted_size
-
     def close = {
-      session_manager.close(session)
+      session_manager.close(downstream)
       dispatch_queue {
         addCapacity( -session_max )
         inbound_sessions -= this
@@ -765,27 +761,21 @@ class Queue(val router: LocalRouter, val
       release
     }
 
-    // Delegate all the flow control stuff to the session
-    def full = session.full
-
     def offer(delivery: Delivery) = {
-      if (session.full) {
+      if (downstream.full) {
         false
       } else {
         delivery.message.retain
         if( tune_persistent && delivery.uow!=null ) {
           delivery.uow.retain
         }
-        val rc = session.offer(delivery)
+        val rc = downstream.offer(delivery)
         assert(rc, "session should accept since it was not full")
         true
       }
     }
-
-    def refiller = session.refiller
-
-    def refiller_=(value: Runnable) = {session.refiller = value}
   }
+  def connect(p: DeliveryProducer) = new QueueDeliverySession(p)
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -1720,9 +1710,6 @@ class Subscription(val queue:Queue, val 
   var avg_advanced_size = queue.tune_consumer_buffer
   var tail_parkings = 1
 
-  var total_dispatched_count = 0L
-  var total_dispatched_size = 0L
-
   var total_ack_count = 0L
   var total_nack_count = 0L
 
@@ -1834,15 +1821,8 @@ class Subscription(val queue:Queue, val 
 
   def matches(entry:Delivery) = session.consumer.matches(entry)
   def full = session.full
-  def offer(delivery:Delivery) = {
-    if( session.offer(delivery) ) {
-      total_dispatched_count += 1
-      total_dispatched_size += delivery.size
-      true
-    } else {
-      false
-    }
-  }
+
+  def offer(delivery:Delivery) = session.offer(delivery)
 
   def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Fri Aug 26 12:22:24 2011
@@ -230,11 +230,35 @@ class CreditWindowFilter[T](val downstre
 }
 
 trait SessionSink[T] extends Sink[T] {
-  def accepted_count:Long
-  def accepted_size:Long
+  /**
+   * The number of elements accepted by this session.
+   */
+  def enqueue_item_counter:Long
+
+  /**
+   * The total size of the elements accepted by this session.
+   */
+  def enqueue_size_counter:Long
+
+  /**
+   * The total size of the elements accepted by this session.
+   */
+  def enqueue_ts:Long
+  /**
+   * An estimate of the capacity left in the session before it stops
+   * accepting more elements.
+   */
   def remaining_capacity:Int
 }
 
+trait SessionSinkFilter[T] extends SessionSink[T] with SinkFilter[T] {
+  def downstream:SessionSink[T]
+  def enqueue_item_counter = downstream.enqueue_item_counter
+  def enqueue_size_counter = downstream.enqueue_size_counter
+  def enqueue_ts = downstream.enqueue_ts
+  def remaining_capacity = downstream.remaining_capacity
+}
+
 object SessionSinkMux {
   val default_session_max_credits = System.getProperty("apollo.default_session_max_credits",
""+(1024*32)).toInt
 }
@@ -313,6 +337,7 @@ class SessionSinkMux[T](val downstream:S
     }
   }
 
+  def time_stamp = 0L
 }
 
 /**
@@ -326,9 +351,11 @@ class Session[T](val producer_queue:Disp
   private def downstream = mux.source
 
   @volatile
-  var accepted_count = 0L
+  var enqueue_item_counter = 0L
+  @volatile
+  var enqueue_size_counter = 0L
   @volatile
-  var accepted_size = 0L
+  var enqueue_ts = 0L
 
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
@@ -371,8 +398,9 @@ class Session[T](val producer_queue:Disp
     } else {
       val size = sizer.size(value)
 
-      accepted_count += 1
-      accepted_size += size
+      enqueue_item_counter += 1
+      enqueue_size_counter += size
+      enqueue_ts = mux.time_stamp
 
       add_credits(-size)
       downstream.merge((this, value))

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Aug 26 12:22:24 2011
@@ -36,16 +36,16 @@ class Topic(val router:LocalRouter, val 
 
   var enqueue_item_counter = 0L
   var enqueue_size_counter = 0L
-  var enqueue_ts = System.currentTimeMillis()
+  var enqueue_ts = now
 
   var dequeue_item_counter = 0L
   var dequeue_size_counter = 0L
-  var dequeue_ts = System.currentTimeMillis()
+  var dequeue_ts = now
 
-  var proxy_sessions = new HashSet[ProxyDeliverySession]()
+  var proxy_sessions = new HashSet[TopicDeliverySession]()
 
   implicit def from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter, from.enqueue_size_counter,
from.enqueue_ts)
-  implicit def from_session(from:ProxyDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter,
from.enqueue_size_counter, from.enqueue_ts)
+  implicit def from_session(from:TopicDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter,
from.enqueue_size_counter, from.enqueue_ts)
 
   def add_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
     to.enqueue_item_counter += from._1
@@ -58,7 +58,9 @@ class Topic(val router:LocalRouter, val 
     to.enqueue_ts = to.enqueue_ts max from._3
   }
 
-  case class ProxyDeliverySession(session:DeliverySession) extends DeliverySession with SinkFilter[Delivery]
{
+  case class TopicDeliverySession(session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery]
{
+    def downstream = session
+
     dispatch_queue {
       proxy_sessions.add(this)
     }
@@ -78,22 +80,10 @@ class Topic(val router:LocalRouter, val 
       }
     }
 
-    var enqueue_ts = now
-    def offer(value: Delivery) = {
-      if( session.offer(value) ) {
-        enqueue_ts = now
-        true
-      } else {
-        false
-      }
-    }
-
-    def downstream = session
-    def remaining_capacity = session.remaining_capacity
     def producer = session.producer
-    def enqueue_size_counter = session.enqueue_size_counter
-    def enqueue_item_counter = session.enqueue_item_counter
     def consumer = session.consumer
+
+    def offer(value: Delivery) = downstream.offer(value)
   }
 
   case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO) extends DeliveryConsumer
{
@@ -104,7 +94,7 @@ class Topic(val router:LocalRouter, val 
     def is_persistent = consumer.is_persistent
     def dispatch_queue = consumer.dispatch_queue
     def connect(producer: DeliveryProducer) = {
-      new ProxyDeliverySession(consumer.connect(producer))
+      new TopicDeliverySession(consumer.connect(producer))
     }
   }
 
@@ -113,7 +103,7 @@ class Topic(val router:LocalRouter, val 
   var durable_subscriptions = ListBuffer[Queue]()
   var consumer_queues = HashMap[DeliveryConsumer, Queue]()
   var idled_at = 0L
-  val created_at = System.currentTimeMillis()
+  val created_at = now
   var auto_delete_after = 0
   var producer_counter = 0L
   var consumer_counter = 0L
@@ -230,11 +220,11 @@ class Topic(val router:LocalRouter, val 
   def check_idle {
     if (producers.isEmpty && consumers.isEmpty && durable_subscriptions.isEmpty)
{
       if (idled_at==0) {
-        val now = System.currentTimeMillis()
-        idled_at = now
+        val previously_idle_at = now
+        idled_at = previously_idle_at
         if( auto_delete_after!=0 ) {
           dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
-            if( now == idled_at ) {
+            if( previously_idle_at == idled_at ) {
               router.topic_domain.remove_destination(path, this)
             }
           }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Fri Aug 26 12:22:24 2011
@@ -23,8 +23,6 @@ import org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
 import collection.mutable.{ListBuffer, HashMap}
 
-import org.apache.activemq.apollo.broker._
-import BufferConversions._
 import java.io.IOException
 import org.apache.activemq.apollo.selector.SelectorParser
 import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
@@ -33,16 +31,19 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
-import protocol._
 import scala.util.continuations._
-import security.SecurityContext
-import support.advisory.AdvisorySupport
 import tcp.TcpTransport
 import codec.OpenWireFormat
 import command._
 import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
 import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO,
DestinationDTO}
 import org.apache.activemq.apollo.openwire.DestinationConverter._
+import org.apache.activemq.apollo.broker._
+import BufferConversions._
+import protocol._
+import security.SecurityContext
+import support.advisory.AdvisorySupport
+
 
 object OpenwireProtocolHandler extends Log {
   def unit:Unit = {}
@@ -87,6 +88,8 @@ class OpenwireProtocolHandler extends Pr
     last_command_id
   }
 
+  def broker = connection.connector.broker
+
   var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
     override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute])
= {
       host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
@@ -195,7 +198,7 @@ class OpenwireProtocolHandler extends Pr
     resumeRead
     reset {
       suspendRead("virtual host lookup")
-      this.host = connection.connector.broker.get_default_virtual_host
+      this.host = broker.get_default_virtual_host
       resumeRead
       if(host==null) {
         async_die("Could not find default virtual host")
@@ -767,7 +770,9 @@ class OpenwireProtocolHandler extends Pr
 
     credit_window_filter.credit(0, info.getPrefetchSize)
 
-    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue,
Delivery)
+    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue,
Delivery) {
+      override def time_stamp = broker.now
+    }
 
     override def exclusive = info.isExclusive
     override def browser = info.isBrowser
@@ -860,17 +865,14 @@ class OpenwireProtocolHandler extends Pr
       }
     }
 
-    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Delivery] {
+    class OpenwireConsumerSession(val producer:DeliveryProducer) extends DeliverySession
with SessionSinkFilter[Delivery] {
+      producer.dispatch_queue.assertExecuting()
       retain
 
-      def producer = p
-      def consumer = ConsumerContext.this
+      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
       var closed = false
 
-      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
-      def remaining_capacity = downstream.remaining_capacity
-      def enqueue_item_counter = downstream.accepted_count
-      def enqueue_size_counter = downstream.accepted_size
+      def consumer = ConsumerContext.this
 
       def close = {
         assert(producer.dispatch_queue.isExecuting)
@@ -928,6 +930,8 @@ class OpenwireProtocolHandler extends Pr
       }
     }
 
+    def connect(p:DeliveryProducer) = new OpenwireConsumerSession(p)
+
     class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
       var credited = false
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Aug 26 12:22:24 2011
@@ -39,7 +39,6 @@ import org.apache.activemq.apollo.transp
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
-import collection.immutable.List._
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -85,6 +84,7 @@ class StompProtocolHandler extends Proto
 
   var connection_log:Log = StompProtocolHandler
   def protocol = "stomp"
+  def broker = connection.connector.broker
 
   def decode_header(value:Buffer):String = {
     var rc = new ByteArrayOutputStream(value.length)
@@ -347,7 +347,9 @@ class StompProtocolHandler extends Proto
 
     credit_window_filter.credit(initial_credit_window._1, initial_credit_window._2)
 
-    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue,
Delivery)
+    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue,
Delivery) {
+      override def time_stamp = broker.now
+    }
 
     override def dispose() = dispatchQueue {
       super.dispose()
@@ -373,26 +375,17 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Delivery] {
-
-      // This session object should only be used from the dispatch queue context
-      // of the producer.
-
+    class StompConsumerSession(val producer:DeliveryProducer) extends DeliverySession with
SessionSinkFilter[Delivery] {
+      producer.dispatch_queue.assertExecuting()
       retain
 
+      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
+
       override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress
 
-      def producer = p
       def consumer = StompConsumer.this
       var closed = false
 
-      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
-
-      def remaining_capacity = downstream.remaining_capacity
-      def enqueue_item_counter = downstream.accepted_count
-      def enqueue_size_counter = downstream.accepted_size
-
-
       def close = {
         assert(producer.dispatch_queue.isExecuting)
         if( !closed ) {
@@ -456,6 +449,7 @@ class StompProtocolHandler extends Proto
       }
 
     }
+    def connect(p:DeliveryProducer) = new StompConsumerSession(p)
   }
 
 //  var session_manager:SessionSinkMux[StompFrame] = null
@@ -948,7 +942,7 @@ class StompProtocolHandler extends Proto
     }
 
     if( config.add_timestamp_header!=null ) {
-      rc ::= (encode_header(config.add_timestamp_header), ascii(System.currentTimeMillis().toString()))
+      rc ::= (encode_header(config.add_timestamp_header), ascii(broker.now.toString()))
     }
 
     // Do we need to add the user id?

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Fri Aug 26 12:22:24 2011
@@ -119,7 +119,7 @@ case class BrokerResource() extends Reso
 
         result.id = broker.id
         result.jvm_metrics = create_jvm_metrics
-        result.current_time = System.currentTimeMillis
+        result.current_time = now
         result.state = broker.service_state.toString
         result.state_since = broker.service_state.since
         result.version = Broker.version
@@ -211,7 +211,7 @@ case class BrokerResource() extends Reso
         get_queue_metrics(broker)
       }
     }
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 
@@ -223,7 +223,7 @@ case class BrokerResource() extends Reso
         get_topic_metrics(broker)
       }
     }
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 
@@ -235,7 +235,7 @@ case class BrokerResource() extends Reso
         get_dsub_metrics(broker)
       }
     }
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 
@@ -248,7 +248,7 @@ case class BrokerResource() extends Reso
     val rc = aggregate_queue_metrics(List(queue, dsub))
     add_destination_metrics(rc, topic)
     rc.objects += topic.objects
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 
@@ -424,7 +424,7 @@ case class BrokerResource() extends Reso
         get_queue_metrics(host)
       }
     }
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 
@@ -435,7 +435,7 @@ case class BrokerResource() extends Reso
         get_topic_metrics(host)
       }
     }
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 
@@ -446,7 +446,7 @@ case class BrokerResource() extends Reso
         get_dsub_metrics(host)
       }
     }
-    rc.current_time = System.currentTimeMillis()
+    rc.current_time = now
     rc
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Fri Aug 26 12:22:24 2011
@@ -275,6 +275,8 @@ abstract class Resource(parent:Resource=
     }
   }
 
+  def now = BrokerRegistry.list.headOption.map(_.now).getOrElse(System.currentTimeMillis())
+
   protected def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker)=>FutureResult[T]):FutureResult[T]
= {
     BrokerRegistry.list.headOption match {
       case Some(broker)=>



Mime
View raw message