activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1023992 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/ apollo-util/src/main/scala/org/apache/activemq/apollo/util/
Date Mon, 18 Oct 2010 20:26:06 GMT
Author: chirino
Date: Mon Oct 18 20:26:06 2010
New Revision: 1023992

URL: http://svn.apache.org/viewvc?rev=1023992&view=rev
Log:
little bit more cleanup, fixed up watchdog trait and re-added, not currently being used at
the moment.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1023992&r1=1023991&r2=1023992&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Mon Oct 18 20:26:06 2010
@@ -21,21 +21,18 @@ import _root_.java.lang.String
 
 import org.apache.activemq.apollo.broker._
 import org.scalatest._
-import java.io.{File, IOException}
-import org.apache.activemq.apollo.util.metric.{Period, MetricAggregator, MetricCounter}
+import java.io.File
+import org.apache.activemq.apollo.util.metric.{Period, MetricAggregator}
 import org.fusesource.hawtbuf.AsciiBuffer
 import collection.mutable.ListBuffer
 import java.net.URL
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtdispatch.ScalaDispatch._
 import org.apache.activemq.apollo.dto.BrokerDTO
-import org.apache.activemq.apollo.transport.TransportFactory
 import org.apache.activemq.apollo.util._
 
 /**
  *
  */
-abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach with ServiceController
{
+abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
   var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
   var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
 
@@ -293,7 +290,6 @@ abstract class BrokerPerfSupport extends
     broker.config.connectors.get(0).advertise
   }
 
-
   def _createProducer(id: Int, messageSize: Int, destination: Destination): RemoteProducer
= {
     var producer = createProducer()
     producer.stopping = stopping
@@ -313,22 +309,21 @@ abstract class BrokerPerfSupport extends
     println("waiting for services to stop")
     stopping.set(true)
 
-    controlServices(false, producers, "producer shutdown")
-    controlServices(false, consumers, "consumer shutdown")
-    controlServices(false, brokers, "broker shutdown")
+    ServiceControl.stop(producers, "producer shutdown")
+    ServiceControl.stop(consumers, "consumer shutdown")
+    ServiceControl.stop(brokers, "broker shutdown")
   }
 
 
   def startBrokers() = {
-    controlServices(true, brokers, "test broker startup")
+    ServiceControl.start(brokers, "test broker startup")
   }
 
-
   def startClients() = {
-    controlServices(true, consumers, "test consumer startup")
+    ServiceControl.start(consumers, "test consumer startup")
     // let the consumers drain the destination for a bit...
     Thread.sleep(1000)
-    controlServices(true, producers, "test producer startup")
+    ServiceControl.start(producers, "test producer startup")
   }
 
   def fixed_sampling = true
@@ -367,11 +362,15 @@ abstract class BrokerPerfSupport extends
     // either we want to do x number of samples or we want to keep sampling while some condition
is true.
     if (fixed_sampling) {
 
-      // Do 1 period of warm up that's not counted...
-      println("Warming up...")
-      Thread.sleep(SAMPLE_PERIOD)
-      totalMessageSent += totalProducerRate.reset()
-      totalMessageSent += totalConsumerRate.reset()
+      // Do 1 period of warm up that's not counted, wait for the broker to load up it's store...
+      var messagesForWarmup = totalMessageSent
+
+      while (messagesForWarmup == totalMessageSent) {
+        println("Warming up...")
+        Thread.sleep(SAMPLE_PERIOD)
+        totalMessageSent += totalProducerRate.reset()
+        totalMessageSent += totalConsumerRate.reset()
+      }
 
       println("Sampling rates")
       for (i <- 0 until PERFORMANCE_SAMPLES) {
@@ -421,116 +420,3 @@ trait FixedSampling extends BrokerPerfSu
   }
 }
 
-abstract class RemoteConnection extends Connection {
-  var uri: String = null
-  var name: String = null
-
-  val rate = new MetricCounter()
-  var rateAggregator: MetricAggregator = null
-
-  var stopping: AtomicBoolean = null
-  var destination: Destination = null
-
-  def init = {
-    if (rate.getName == null) {
-      rate.name(name + " Rate")
-    }
-    rateAggregator.add(rate)
-  }
-
-  var callbackWhenConnected: Runnable = null
-
-  override protected def _start(onComplete: Runnable) = {
-    callbackWhenConnected = onComplete
-    transport = TransportFactory.connect(uri)
-    super._start(^ {})
-  }
-
-  override def onTransportConnected() = {
-    onConnected()
-    transport.resumeRead
-    callbackWhenConnected.run
-    callbackWhenConnected = null
-  }
-
-  protected def onConnected()
-
-  override def onTransportFailure(error: IOException) = {
-    if (!stopped) {
-      if (stopping.get()) {
-        transport.stop
-      } else {
-        onFailure(error)
-        if (callbackWhenConnected != null) {
-          warn("connect attempt failed. will retry connection..")
-          dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
-            if (stopping.get()) {
-              callbackWhenConnected.run
-            } else {
-              // try to connect again...
-              transport = TransportFactory.connect(uri)
-              super._start(^ {})
-            }
-          })
-        }
-      }
-    }
-  }
-}
-
-abstract class RemoteConsumer extends RemoteConnection {
-  var thinkTime: Long = 0
-  var selector: String = null
-  var durable = false
-  var persistent = false
-}
-
-
-abstract class RemoteProducer extends RemoteConnection {
-  var messageIdGenerator: AtomicLong = null
-  var priority = 0
-  var persistent = false
-  var priorityMod = 0
-  var counter = 0
-  var producerId = 0
-  var property: String = null
-  var next: Delivery = null
-  var thinkTime: Long = 0
-
-  var filler: String = null
-  var payloadSize = 20
-
-  override def init = {
-    super.init
-
-    if (payloadSize > 0) {
-      var sb = new StringBuilder(payloadSize)
-      for (i <- 0 until payloadSize) {
-        sb.append(('a' + (i % 26)).toChar)
-      }
-      filler = sb.toString()
-    }
-  }
-
-  def createPayload(): String = {
-    if (payloadSize >= 0) {
-      var sb = new StringBuilder(payloadSize)
-      sb.append(name)
-      sb.append(':')
-      counter += 1
-      sb.append(counter)
-      sb.append(':')
-      var length = sb.length
-      if (length <= payloadSize) {
-        sb.append(filler.subSequence(0, payloadSize - length))
-        return sb.toString()
-      } else {
-        return sb.substring(0, payloadSize)
-      }
-    } else {
-      counter += 1
-      return name + ":" + (counter)
-    }
-  }
-
-}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala?rev=1023992&r1=1023991&r2=1023992&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
Mon Oct 18 20:26:06 2010
@@ -20,8 +20,7 @@ package org.apache.activemq.apollo.broke
 import org.apache.activemq.apollo.broker.Destination
 import tools.nsc.io.Directory
 import org.apache.activemq.apollo.util.metric.MetricAggregator
-import org.apache.activemq.apollo.util.FileSupport
-
+import org.apache.activemq.apollo.util.{ServiceControl, FileSupport}
 
 trait LargeInitialDB extends PersistentScenario {
   PURGE_STORE = false
@@ -54,7 +53,7 @@ trait LargeInitialDB extends PersistentS
 
     println("Using store at " + original + " and backup at " + backup)
 
-    controlService(true, sendBroker, "initial db broker startup")
+    ServiceControl.start(sendBroker, "initial db broker startup")
 
     PTP = true
     val dests: Array[Destination] = createDestinations(1)
@@ -62,7 +61,7 @@ trait LargeInitialDB extends PersistentS
     val producer: RemoteProducer = _createProducer(0, 20, dests(0))
     producer.persistent = true
 
-    controlService(true, producer, "initial db producer startup")
+    ServiceControl.start(producer, "initial db producer startup")
 
     val messages = 1000000L
 
@@ -72,8 +71,8 @@ trait LargeInitialDB extends PersistentS
       Thread.sleep(5000)
     }
 
-    controlService(false, producer, "producer shutdown")
-    controlService(false, sendBroker, "broker shutdown")
+    ServiceControl.stop(producer, "producer shutdown")
+    ServiceControl.stop(sendBroker, "broker shutdown")
 
     saveDB
   }

Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala?rev=1023992&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
Mon Oct 18 20:26:06 2010
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.broker.perf
+
+import org.apache.activemq.apollo.util.metric._
+import org.apache.activemq.apollo.broker.{Destination, Delivery, Connection}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import java.io.IOException
+import org.apache.activemq.apollo.transport.TransportFactory
+
+abstract class RemoteConnection extends Connection {
+  var uri: String = null
+  var name: String = null
+
+  val rate = new MetricCounter()
+  var rateAggregator: MetricAggregator = null
+
+  var stopping: AtomicBoolean = null
+  var destination: Destination = null
+
+  def init = {
+    if (rate.getName == null) {
+      rate.name(name + " Rate")
+    }
+    rateAggregator.add(rate)
+  }
+
+  var callbackWhenConnected: Runnable = null
+
+  override protected def _start(onComplete: Runnable) = {
+    callbackWhenConnected = onComplete
+    transport = TransportFactory.connect(uri)
+    super._start(^ {})
+  }
+
+  override def onTransportConnected() = {
+    onConnected()
+    transport.resumeRead
+    callbackWhenConnected.run
+    callbackWhenConnected = null
+  }
+
+  protected def onConnected()
+
+  override def onTransportFailure(error: IOException) = {
+    if (!stopped) {
+      if (stopping.get()) {
+        transport.stop
+      } else {
+        onFailure(error)
+        if (callbackWhenConnected != null) {
+          warn("connect attempt failed. will retry connection..")
+          dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
+            if (stopping.get()) {
+              callbackWhenConnected.run
+            } else {
+              // try to connect again...
+              transport = TransportFactory.connect(uri)
+              super._start(^ {})
+            }
+          })
+        }
+      }
+    }
+  }
+}
+
+abstract class RemoteConsumer extends RemoteConnection {
+  var thinkTime: Long = 0
+  var selector: String = null
+  var durable = false
+  var persistent = false
+
+  protected def messageReceived()
+}
+
+
+abstract class RemoteProducer extends RemoteConnection {
+  var messageIdGenerator: AtomicLong = null
+  var priority = 0
+  var persistent = false
+  var priorityMod = 0
+  var counter = 0
+  var producerId = 0
+  var property: String = null
+  var next: Delivery = null
+  var thinkTime: Long = 0
+
+  var filler: String = null
+  var payloadSize = 20
+
+  override def init = {
+    super.init
+
+    if (payloadSize > 0) {
+      var sb = new StringBuilder(payloadSize)
+      for (i <- 0 until payloadSize) {
+        sb.append(('a' + (i % 26)).toChar)
+      }
+      filler = sb.toString()
+    }
+  }
+
+  def createPayload(): String = {
+    if (payloadSize >= 0) {
+      var sb = new StringBuilder(payloadSize)
+      sb.append(name)
+      sb.append(':')
+      counter += 1
+      sb.append(counter)
+      sb.append(':')
+      var length = sb.length
+      if (length <= payloadSize) {
+        sb.append(filler.subSequence(0, payloadSize - length))
+        return sb.toString()
+      } else {
+        return sb.substring(0, payloadSize)
+      }
+    } else {
+      counter += 1
+      return name + ":" + (counter)
+    }
+  }
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1023992&r1=1023991&r2=1023992&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Mon Oct 18 20:26:06 2010
@@ -30,12 +30,13 @@ import Stomp._
 import _root_.org.apache.activemq.apollo.stomp.StompFrame
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
-class StompRemoteConsumer extends RemoteConsumer with Logging {
+
+class StompRemoteConsumer extends RemoteConsumer {
   var outboundSink: OverflowSink[StompFrame] = null
 
   def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
-    outboundSink.refiller = ^{}
+    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+    outboundSink.refiller = ^ {}
 
     val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
       ascii("/queue/" + destination.getName().toString());
@@ -96,7 +97,7 @@ class StompRemoteConsumer extends Remote
 class StompRemoteProducer extends RemoteProducer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
   var stompDestination: AsciiBuffer = null
-  var frame:StompFrame = null
+  var frame: StompFrame = null
 
   def send_next: Unit = {
       var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
@@ -118,8 +119,8 @@ class StompRemoteProducer extends Remote
   }
 
   def drain() = {
-    if( frame!=null ) {
-      if( !outboundSink.full ) {
+    if (frame != null) {
+      if (!outboundSink.full) {
         outboundSink.offer(frame)
         frame = null
         rate.increment
@@ -129,7 +130,7 @@ class StompRemoteProducer extends Remote
           }
         }
 
-        if( !persistent ) {
+        if (!persistent) {
           // if we are not going to wait for an ack back from the server,
           // then jut send the next one...
           if (thinkTime > 0) {
@@ -143,8 +144,8 @@ class StompRemoteProducer extends Remote
   }
 
   override def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
-    outboundSink.refiller = ^ { drain }
+    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+    outboundSink.refiller = ^ {drain}
 
     if (destination.getDomain() == Router.QUEUE_DOMAIN) {
       stompDestination = ascii("/queue/" + destination.getName().toString());
@@ -172,3 +173,30 @@ class StompRemoteProducer extends Remote
   }
 }
 
+trait Watchog extends RemoteConsumer {
+  var messageCount = 0
+
+  def watchdog(lastMessageCount: Int): Unit = {
+    val seconds = 10
+    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+      if (messageCount == lastMessageCount) {
+        warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+        stop
+      } else {
+        watchdog(messageCount)
+      }
+    })
+  }
+
+  abstract override protected def messageReceived() = {
+    super.messageReceived
+    messageCount += 1
+  }
+
+  abstract override protected def onConnected() = {
+    super.onConnected
+    watchdog(messageCount)
+  }
+
+}
+

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala?rev=1023992&r1=1023991&r2=1023992&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/ServiceController.scala
Mon Oct 18 20:26:06 2010
@@ -23,21 +23,37 @@ import collection.mutable.ListBuffer
   Simple trait to cut down on the code necessary to manage BaseService instances
  */
 
-trait ServiceController {
+object ServiceControl {
 
   // start or stop a single service
-  def controlService(start: Boolean, service: Service, action: String) = {
+  private def controlService(start: Boolean, service: Service, action: String) = {
     val tracker = new LoggingTracker(action)
     if (start) tracker.start(service) else tracker.stop(service)
     tracker.await
   }
 
   // start or stop a bunch of services in one go
-  def controlServices(start: Boolean, services: ListBuffer[Service], action: String) = {
+  private def controlServices(start: Boolean, services: ListBuffer[Service], action: String)
= {
     val tracker = new LoggingTracker(action)
-    services.foreach((service: Service) => {if (start) tracker.start(service) else tracker.stop(service)})
+    services.foreach(service => {if (start) tracker.start(service) else tracker.stop(service)})
     tracker.await
   }
 
+  def start(services: ListBuffer[Service], action: String) = {
+    controlServices(true, services, action)
+  }
+
+  def stop(services: ListBuffer[Service], action: String) = {
+    controlServices(false, services, action)
+  }
+
+  def start(service: Service, action: String) = {
+    controlService(true, service, action)
+  }
+
+  def stop(service: Service, action: String) = {
+    controlService(false, service, action)
+  }
+
 
 }
\ No newline at end of file



Mime
View raw message