activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1023986 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/ apollo-util/src/main/scala/org/apache/activemq/apollo/uti...
Date Mon, 18 Oct 2010 20:25:23 GMT
Author: chirino
Date: Mon Oct 18 20:25:23 2010
New Revision: 1023986

URL: http://svn.apache.org/viewvc?rev=1023986&view=rev
Log:
Create initial DB with 1M messages, save it and re-use it, modified DeepQueueScenarios to
just push messages onto the queue and took out the fixed sampling. Still needs a bit of work.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
      - copied, changed from r1023948, activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Mon Oct 18 20:25:23 2010
@@ -211,6 +211,24 @@ abstract class BrokerPerfSupport extends
     config
   }
 
+  def createDestinations(destCount: Int) : Array[Destination] = {
+    var dests = new Array[Destination](destCount)
+
+    for (i <- 0 until destCount) {
+      val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
+      val name = new AsciiBuffer("dest" + (i + 1))
+      var bean = new SingleDestination(domain, name)
+      dests(i) = bean
+      //        if (PTP) {
+      //          sendBroker.defaultVirtualHost.createQueue(dests(i))
+      //          if (MULTI_BROKER) {
+      //            rcvBroker.defaultVirtualHost.createQueue(dests(i))
+      //          }
+      //        }
+    }
+    dests
+  }
+
   def createConnections() = {
 
     if (MULTI_BROKER) {
@@ -229,24 +247,11 @@ abstract class BrokerPerfSupport extends
 
     startBrokers()
 
-    var dests = new Array[Destination](destCount)
-
-    for (i <- 0 until destCount) {
-      val domain = if (PTP) {Router.QUEUE_DOMAIN} else {Router.TOPIC_DOMAIN}
-      val name = new AsciiBuffer("dest" + (i + 1))
-      var bean = new SingleDestination(domain, name)
-      dests(i) = bean
-//        if (PTP) {
-//          sendBroker.defaultVirtualHost.createQueue(dests(i))
-//          if (MULTI_BROKER) {
-//            rcvBroker.defaultVirtualHost.createQueue(dests(i))
-//          }
-//        }
-    }
+    val dests: Array[Destination] = createDestinations(destCount)
 
     for (i <- 0 until producerCount) {
       var destination = dests(i % destCount)
-      var producer = _createProducer(i, destination)
+      var producer = _createProducer(i, MESSAGE_SIZE,  destination)
       producer.persistent = PERSISTENT
       producers += (producer)
     }
@@ -286,7 +291,7 @@ abstract class BrokerPerfSupport extends
   }
 
 
-  def _createProducer(id: Int, destination: Destination): RemoteProducer = {
+  def _createProducer(id: Int, messageSize: Int, destination: Destination): RemoteProducer
= {
     var producer = createProducer()
     producer.stopping = stopping
 
@@ -296,7 +301,7 @@ abstract class BrokerPerfSupport extends
     producer.destination = destination
     producer.messageIdGenerator = msgIdGenerator
     producer.rateAggregator = totalProducerRate
-    producer.payloadSize = MESSAGE_SIZE
+    producer.payloadSize = messageSize
     producer.init
     producer
   }
@@ -329,7 +334,6 @@ abstract class BrokerPerfSupport extends
     tracker.await
   }
 
-
   def startClients() = {
     var tracker = new LoggingTracker("test consumer startup")
     for (connection <- consumers) {
@@ -415,6 +419,25 @@ abstract class BrokerPerfSupport extends
 
 
 }
+
+trait FixedSampling extends BrokerPerfSupport {
+  val MIN_MESSAGES = 100000L
+
+  override def fixed_sampling = false
+
+  override def keep_sampling:Boolean = {
+    if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
+      println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
+      return true
+    }
+    if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
+      println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
+      return true
+    }
+    return false
+  }
+}
+
 abstract class RemoteConnection extends Connection {
   var uri: String = null
   var name:String = null
@@ -425,8 +448,6 @@ abstract class RemoteConnection extends 
   var stopping:AtomicBoolean = null
   var destination: Destination = null
 
-  var messageCount = 0
-
   def init = {
     if( rate.getName == null ) {
       rate.name(name + " Rate")
@@ -472,13 +493,6 @@ abstract class RemoteConnection extends 
       }
     }
   }
-
-  protected def doStop()
-
-  protected def incrementMessageCount() = {
-    messageCount = messageCount + 1
-  }
-
 }
 
 abstract class RemoteConsumer extends RemoteConnection {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/DeepQueueScenarios.scala
Mon Oct 18 20:25:23 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.broker.perf
 
-import java.net.URL
-
 /**
  * <p>
  * </p>
@@ -26,30 +24,9 @@ import java.net.URL
  */
 trait DeepQueueScenarios extends PersistentScenario {
 
-  PERSISTENT = true
-  val MIN_MESSAGES = 100000
-
-  override def fixed_sampling = false
-
-  override def keep_sampling:Boolean = {
-    if( producerCount > 0 && totalMessageSent < MIN_MESSAGES ) {
-      println("Waiting for producers: %s/%s".format(totalMessageSent, MIN_MESSAGES));
-      return true
-    }
-    if ( consumerCount > 0 && totalMessageReceived < MIN_MESSAGES ) {
-      println("Waiting for consumers: %s/%s".format(totalMessageReceived, MIN_MESSAGES));
-      return true
-    }
-    return false
-  }
-
-  override def reportResourceTemplate():URL = { classOf[DeepQueueScenarios].getResource("persistent-report.html")
}
-
-  //override def partitionedLoad = List(1, 2, 4, 8, 10)
   override def highContention = 100
-  //override def messageSizes = List(20, 1024, 1024*256)
 
-  for ( load <- partitionedLoad ; messageSize <- List(20,1024)  ) {
+  for ( count <- partitionedLoad ; messageSize <- messageSizes  ) {
 
     def benchmark(name: String)(func: => Unit) {
       test(name) {
@@ -59,12 +36,12 @@ trait DeepQueueScenarios extends Persist
         func
       }
     }
+    
+    val prefix = "queue " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b"
) + " "
+    val suffix = "" //(if( durable ) " durable" else "")
 
-    val info = "queue " + MIN_MESSAGES + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k"
else messageSize+"b" ) + " with " + load + " "
-
-    benchmark("En" + info + "producer(s)") {
-      PURGE_STORE = true
-      producerCount = load;
+    benchmark(format("%s%d%s", prefix, count, suffix)) {
+      producerCount = count;
       createConnections();
 
       // Start 'em up.
@@ -74,23 +51,6 @@ trait DeepQueueScenarios extends Persist
       } finally {
         stopServices();
       }
-      this.assert(totalMessageSent > MIN_MESSAGES, "Unexpected number of messages sent!")
-    }
-
-    benchmark("De" + info + "consumer(s)") {
-      PURGE_STORE = false
-      consumerCount = load;
-      createConnections();
-
-      // Start 'em up.
-      startClients();
-      try {
-        reportRates();
-      } finally {
-        stopServices();
-      }
-      this.assert(totalMessageReceived > MIN_MESSAGES, "Unexpected number of messages
received!")
     }
   }
-
 }
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala?rev=1023986&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/LargeInitialDB.scala
Mon Oct 18 20:25:23 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.activemq.apollo.broker.perf
+
+import org.apache.activemq.apollo.broker.{Destination, Broker}
+import tools.nsc.io.Directory
+import org.apache.activemq.apollo.util.metric.MetricAggregator
+import org.apache.activemq.apollo.util.{FileSupport, LoggingTracker}
+
+
+trait LargeInitialDB extends PersistentScenario {
+
+  PURGE_STORE = false
+
+  var original: Directory = null
+  var backup: Directory = null;
+
+  // delete existing data file and copy new data file over
+  override protected def beforeEach() = {
+    println("Restoring DB")
+    restoreDB
+    super.beforeEach
+  }
+
+  // start a broker connect a producer and dump a bunch of messages
+  // into a destination
+  override protected def beforeAll(configMap: Map[String, Any]) = {
+    super.beforeAll(configMap)
+
+    sendBroker = new Broker()
+    sendBroker.config = createBrokerConfig("Broker", sendBrokerBindURI, sendBrokerConnectURI)
+    val store = sendBroker.config.virtual_hosts.get(0).store
+
+    original = new Directory(storeDirectory)
+    if ( original.exists ) {
+      original.deleteRecursively
+      original.createDirectory(true)      
+    }
+    val backupLocation = FileSupport.toDirectory(storeDirectory.getParent)./(FileSupport.toDirectory("backup"))
+    backup = backupLocation
+    cleanBackup
+
+    println("Using store at " + original + " and backup at " + backup)
+
+    var tracker = new LoggingTracker("initial db broker startup")
+    tracker.start(sendBroker)
+    tracker.await
+
+    PTP = true
+    val dests: Array[Destination] = createDestinations(1)
+    totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
   
+    val producer: RemoteProducer = _createProducer(0, 1024, dests(0))
+    producer.persistent = true
+
+    tracker = new LoggingTracker("initial db producer startup")
+    tracker.start(producer)
+    tracker.await
+
+    val messages = 1000000L
+
+    println("Filling broker with " + messages + " 1k messages")
+    while (producer.rate.counter() < messages) {
+      println("Waiting for producer " + producer.rate.counter() + "/" + messages)
+      Thread.sleep(5000)
+    }
+
+    tracker = new LoggingTracker("producer shutdown")
+    tracker.stop(producer)
+    tracker.await
+    tracker = new LoggingTracker("broker shutdown")
+    tracker.stop(sendBroker)
+    tracker.await
+
+    Thread.sleep(10000)
+
+    saveDB
+  }
+
+  def saveDB {
+    println("Copying contents of " + original + " to " + backup)
+    cleanBackup
+    FileSupport.recursiveCopy(original, backup)
+    printStores
+  }
+
+  def printStores {
+    println("\nOriginal store")
+    original.deepList().foreach(println)
+    println("\n\nBackup store")
+    backup.deepList().foreach(println)
+  }
+
+  def restoreDB {
+    original.deleteRecursively
+    println("Copying contents of " + backup + " to " + original)
+    FileSupport.recursiveCopy(backup, original)
+    printStores
+  }
+
+  def cleanBackup {
+    if (backup.exists) {
+      backup.deleteRecursively
+    }
+    backup.createDirectory(true)
+    printStores
+  }
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/PersistentScenario.scala
Mon Oct 18 20:25:23 2010
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.apollo.broker.perf
 
+import java.net.URL
+import java.io.File
+
 /**
  * <p>
  * </p>
@@ -24,6 +27,9 @@ package org.apache.activemq.apollo.broke
  */
 trait PersistentScenario extends BrokerPerfSupport {
 
+  var storeDirectory: File = null
+
+  override def reportResourceTemplate():URL = { classOf[PersistentScenario].getResource("persistent-report.html")
}
   PERSISTENT = true
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1023986&r1=1023985&r2=1023986&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Mon Oct 18 20:25:23 2010
@@ -16,19 +16,7 @@
  */
 package org.apache.activemq.apollo.stomp.perf
 
-import _root_.java.util.concurrent.TimeUnit
-import _root_.org.apache.activemq.apollo.broker._
 import _root_.org.apache.activemq.apollo.broker.perf._
-import _root_.org.apache.activemq.apollo.stomp._
-import _root_.org.apache.activemq.apollo.util._
-
-import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
-
-import AsciiBuffer._
-import Stomp._
-import _root_.org.apache.activemq.apollo.stomp.StompFrame
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import java.io.File
 import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
 import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
@@ -42,11 +30,11 @@ class BasicHawtDBTest extends BasicScena
   override def description = "Using the STOMP protocol over TCP"
 }
 
-class DeepQueueHawtDBTest extends DeepQueueScenarios with HawtDBScenario with StompScenario
{
+class DeepQueueHawtDBTest extends DeepQueueScenarios with LargeInitialDB with HawtDBScenario
with StompScenario {
   override def description = "Using the STOMP protocol over TCP persisting to the HawtDB
store."
 }
 
-class DeepQueueBDBTest extends DeepQueueScenarios with BDBScenario with StompScenario {
+class DeepQueueBDBTest extends DeepQueueScenarios with LargeInitialDB with BDBScenario with
StompScenario {
   override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB
store."
 }
 
@@ -56,200 +44,25 @@ trait StompScenario extends BrokerPerfSu
   override def getRemoteProtocolName() = "stomp"
 }
 
-trait HawtDBScenario extends BrokerPerfSupport {
+trait HawtDBScenario extends PersistentScenario {
   override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO
= {
     val rc = super.createBrokerConfig(name, bindURI, connectUri)
     val store = new HawtDBStoreDTO
-    store.directory = new File(new File(testDataDir, getClass.getName), name)
+    storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+    store.directory = storeDirectory
     rc.virtual_hosts.get(0).store = store
     rc
   }
 }
-trait BDBScenario extends BrokerPerfSupport {
+
+trait BDBScenario extends PersistentScenario {
   override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO
= {
     val rc = super.createBrokerConfig(name, bindURI, connectUri)
-
     val store = new BDBStoreDTO
-    store.directory = new File(new File(testDataDir, getClass.getName), name)
-
+    storeDirectory = new File(new File(testDataDir, getClass.getName), name)
+    store.directory = storeDirectory
     rc.virtual_hosts.get(0).store = store
     rc
   }
 }
 
-class StompRemoteConsumer extends RemoteConsumer with Logging {
-  var outboundSink: OverflowSink[StompFrame] = null
-
-  def watchdog(lastMessageCount: Int) : Unit = {
-    val seconds = 10
-    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
-          if (messageCount == lastMessageCount) {
-            warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
-            stop
-          } else {
-            watchdog(messageCount)
-          }
-        })
-  }
-
-  def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
-    outboundSink.refiller = ^{}
-
-    val stompDestination = if (destination.getDomain() == Router.QUEUE_DOMAIN) {
-      ascii("/queue/" + destination.getName().toString());
-    } else {
-      ascii("/topic/" + destination.getName().toString());
-    }
-
-    var frame = StompFrame(CONNECT);
-    outboundSink.offer(frame);
-
-    var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-    headers ::= (DESTINATION, stompDestination)
-    headers ::= (ID, ascii("stomp-sub-" + name))
-
-    if( persistent ) {
-      headers ::= (ACK_MODE, CLIENT)
-    }
-
-    frame = StompFrame(SUBSCRIBE, headers);
-    outboundSink.offer(frame);
-    watchdog(messageCount)
-  }
-
-  override def onTransportCommand(command: Object) = {
-    var frame = command.asInstanceOf[StompFrame]
-    frame match {
-      case StompFrame(CONNECTED, headers, _, _) =>
-      case StompFrame(MESSAGE, headers, content, _) =>
-          messageReceived();
-
-          // we client ack if persistent messages are being used.
-          if( persistent ) {
-            var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
-            outboundSink.offer(StompFrame(ACK, rc));
-          }
-
-      case StompFrame(ERROR, headers, content, _) =>
-        onFailure(new Exception("Server reported an error: " + frame.content));
-      case _ =>
-        onFailure(new Exception("Unexpected stomp command: " + frame.action));
-    }
-  }
-
-  protected def messageReceived() {
-      if (thinkTime > 0) {
-        transport.suspendRead
-        dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
-          incrementMessageCount          
-          rate.increment();
-          if (!stopped) {
-            transport.resumeRead
-          }
-        })
-      } else {
-        incrementMessageCount
-        rate.increment
-      }
-  }
-
-  override def doStop() = {
-    outboundSink.offer(StompFrame(DISCONNECT));
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
-        transport.stop
-        stop
-      })
-  }
-}
-
-class StompRemoteProducer extends RemoteProducer with Logging {
-  var outboundSink: OverflowSink[StompFrame] = null
-  var stompDestination: AsciiBuffer = null
-  var frame:StompFrame = null
-
-  def send_next: Unit = {
-      var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-      headers ::= (DESTINATION, stompDestination);
-      if (property != null) {
-        headers ::= (ascii(property), ascii(property));
-      }
-      if( persistent ) {
-        headers ::= ((RECEIPT_REQUESTED, ascii("x")));
-      }
-      //    var p = this.priority;
-      //    if (priorityMod > 0) {
-      //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-      //    }
-
-      var content = ascii(createPayload());
-      frame = StompFrame(SEND, headers, BufferContent(content))
-      drain()
-  }
-
-  def drain() = {
-    if( frame!=null ) {
-      if( !outboundSink.full ) {
-        outboundSink.offer(frame)
-        frame = null
-        rate.increment
-        val task = ^ {
-          if (!stopped) {
-            incrementMessageCount
-            send_next
-          }
-        }
-
-        if( !persistent ) {
-          // if we are not going to wait for an ack back from the server,
-          // then jut send the next one...
-          if (thinkTime > 0) {
-            dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
-          } else {
-            dispatchQueue << task
-          }
-        }
-      }
-    }
-  }
-
-  override def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
-    outboundSink.refiller = ^ { drain }
-
-    if (destination.getDomain() == Router.QUEUE_DOMAIN) {
-      stompDestination = ascii("/queue/" + destination.getName().toString());
-    } else {
-      stompDestination = ascii("/topic/" + destination.getName().toString());
-    }
-    outboundSink.offer(StompFrame(CONNECT));
-    send_next
-  }
-
-  override def onTransportCommand(command: Object) = {
-    var frame = command.asInstanceOf[StompFrame]
-    frame match {
-      case StompFrame(RECEIPT, headers, _, _) =>
-        assert( persistent )
-        // we got the ack for the previous message we sent.. now send the next one.
-        incrementMessageCount
-        send_next
-
-      case StompFrame(CONNECTED, headers, _, _) =>
-      case StompFrame(ERROR, headers, content, _) =>
-        onFailure(new Exception("Server reported an error: " + frame.content.utf8));
-      case _ =>
-        onFailure(new Exception("Unexpected stomp command: " + frame.action));
-    }
-  }
-
-  override def doStop() = {
-    outboundSink.offer(StompFrame(DISCONNECT));
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
-        transport.stop
-        stop
-      })
-  }  
-
-}
-

Copied: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(from r1023948, activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?p2=activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala&r1=1023948&r2=1023986&rev=1023986&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Mon Oct 18 20:25:23 2010
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.apollo.stomp.perf
 
 import _root_.java.util.concurrent.TimeUnit
@@ -23,62 +24,15 @@ import _root_.org.apache.activemq.apollo
 import _root_.org.apache.activemq.apollo.util._
 
 import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
 
 import AsciiBuffer._
 import Stomp._
 import _root_.org.apache.activemq.apollo.stomp.StompFrame
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.io.File
-import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
-import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
-
-
-class BasicNonPersistentTest extends BasicScenarios with StompScenario {
-  override def description = "Using the STOMP protocol over TCP"
-}
-
-class BasicHawtDBTest extends BasicScenarios with PersistentScenario with HawtDBScenario
with StompScenario {
-  override def description = "Using the STOMP protocol over TCP"
-}
-
-class DeepQueueHawtDBTest extends DeepQueueScenarios with HawtDBScenario with StompScenario
{
-  override def description = "Using the STOMP protocol over TCP persisting to the HawtDB
store."
-}
-
-class DeepQueueBDBTest extends DeepQueueScenarios with BDBScenario with StompScenario {
-  override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB
store."
-}
-
-trait StompScenario extends BrokerPerfSupport {
-  override def createProducer() = new StompRemoteProducer()
-  override def createConsumer() = new StompRemoteConsumer()
-  override def getRemoteProtocolName() = "stomp"
-}
-
-trait HawtDBScenario extends BrokerPerfSupport {
-  override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO
= {
-    val rc = super.createBrokerConfig(name, bindURI, connectUri)
-    val store = new HawtDBStoreDTO
-    store.directory = new File(new File(testDataDir, getClass.getName), name)
-    rc.virtual_hosts.get(0).store = store
-    rc
-  }
-}
-trait BDBScenario extends BrokerPerfSupport {
-  override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO
= {
-    val rc = super.createBrokerConfig(name, bindURI, connectUri)
-
-    val store = new BDBStoreDTO
-    store.directory = new File(new File(testDataDir, getClass.getName), name)
-
-    rc.virtual_hosts.get(0).store = store
-    rc
-  }
-}
 
 class StompRemoteConsumer extends RemoteConsumer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
+  var messageCount = 0
 
   def watchdog(lastMessageCount: Int) : Unit = {
     val seconds = 10
@@ -102,18 +56,18 @@ class StompRemoteConsumer extends Remote
       ascii("/topic/" + destination.getName().toString());
     }
 
-    var frame = StompFrame(CONNECT);
+    var frame = StompFrame(Stomp.Commands.CONNECT);
     outboundSink.offer(frame);
 
     var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-    headers ::= (DESTINATION, stompDestination)
-    headers ::= (ID, ascii("stomp-sub-" + name))
+    headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+    headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
 
     if( persistent ) {
-      headers ::= (ACK_MODE, CLIENT)
+      headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
     }
 
-    frame = StompFrame(SUBSCRIBE, headers);
+    frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
     outboundSink.offer(frame);
     watchdog(messageCount)
   }
@@ -121,17 +75,17 @@ class StompRemoteConsumer extends Remote
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
-      case StompFrame(CONNECTED, headers, _, _) =>
-      case StompFrame(MESSAGE, headers, content, _) =>
+      case StompFrame(Responses.CONNECTED, headers, _, _) =>
+      case StompFrame(Responses.MESSAGE, headers, content, _) =>
           messageReceived();
 
           // we client ack if persistent messages are being used.
           if( persistent ) {
-            var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
-            outboundSink.offer(StompFrame(ACK, rc));
+            var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+            outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
           }
 
-      case StompFrame(ERROR, headers, content, _) =>
+      case StompFrame(Responses.ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content));
       case _ =>
         onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -142,27 +96,20 @@ class StompRemoteConsumer extends Remote
       if (thinkTime > 0) {
         transport.suspendRead
         dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
-          incrementMessageCount          
+          messageCount += 1
           rate.increment();
           if (!stopped) {
             transport.resumeRead
           }
         })
       } else {
-        incrementMessageCount
+        messageCount += 1
         rate.increment
       }
   }
-
-  override def doStop() = {
-    outboundSink.offer(StompFrame(DISCONNECT));
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
-        transport.stop
-        stop
-      })
-  }
 }
 
+
 class StompRemoteProducer extends RemoteProducer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
   var stompDestination: AsciiBuffer = null
@@ -170,12 +117,12 @@ class StompRemoteProducer extends Remote
 
   def send_next: Unit = {
       var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-      headers ::= (DESTINATION, stompDestination);
+      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
       if (property != null) {
         headers ::= (ascii(property), ascii(property));
       }
       if( persistent ) {
-        headers ::= ((RECEIPT_REQUESTED, ascii("x")));
+        headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
       }
       //    var p = this.priority;
       //    if (priorityMod > 0) {
@@ -183,7 +130,7 @@ class StompRemoteProducer extends Remote
       //    }
 
       var content = ascii(createPayload());
-      frame = StompFrame(SEND, headers, BufferContent(content))
+      frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
       drain()
   }
 
@@ -195,7 +142,6 @@ class StompRemoteProducer extends Remote
         rate.increment
         val task = ^ {
           if (!stopped) {
-            incrementMessageCount
             send_next
           }
         }
@@ -222,34 +168,24 @@ class StompRemoteProducer extends Remote
     } else {
       stompDestination = ascii("/topic/" + destination.getName().toString());
     }
-    outboundSink.offer(StompFrame(CONNECT));
+    outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
     send_next
   }
 
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
-      case StompFrame(RECEIPT, headers, _, _) =>
+      case StompFrame(Responses.RECEIPT, headers, _, _) =>
         assert( persistent )
         // we got the ack for the previous message we sent.. now send the next one.
-        incrementMessageCount
         send_next
 
-      case StompFrame(CONNECTED, headers, _, _) =>
-      case StompFrame(ERROR, headers, content, _) =>
+      case StompFrame(Responses.CONNECTED, headers, _, _) =>
+      case StompFrame(Responses.ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content.utf8));
       case _ =>
         onFailure(new Exception("Unexpected stomp command: " + frame.action));
     }
   }
-
-  override def doStop() = {
-    outboundSink.offer(StompFrame(DISCONNECT));
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
-        transport.stop
-        stop
-      })
-  }  
-
 }
 

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala?rev=1023986&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/FileSupport.scala
Mon Oct 18 20:25:23 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import tools.nsc.io.{File, Path, Directory}
+
+object FileSupport {
+
+  def recursiveCopy(source: Path, target: Path) : Unit = {
+    require(source.isDirectory, source.path + " must be a directory.")
+    if ( !target.exists ) {
+      target.toDirectory.createDirectory()
+    }
+
+    def createOrCopy(file: Path) : Unit = {
+      val newTarget = target / FileSupport.toDirectory(file.name)
+      if (file.isDirectory) {
+        recursiveCopy(file.toDirectory, newTarget)
+      } else {
+        file.toFile.copyTo(newTarget)
+      }
+    }
+    source.toDirectory.list.foreach(createOrCopy)
+  }
+
+  def toDirectory(name: String) : Directory = {
+    new Directory(new java.io.File(name))
+  }
+
+  def toFile(name: String) : File = {
+    new File(new java.io.File(name))
+  }
+  
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala?rev=1023986&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FileSupportTest.scala
Mon Oct 18 20:25:23 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.activemq.apollo.util
+
+import org.scalatest.matchers.ShouldMatchers
+import tools.nsc.io.{Path, File, Directory}
+import collection.mutable.ArrayBuffer
+
+class FileSupportTest extends FunSuiteSupport with ShouldMatchers {
+
+  test("recursive file copy test") {
+
+    val base = new Directory(baseDir)
+    var target = base / FileSupport.toDirectory("target")
+
+    val sourceDir: Directory = target / FileSupport.toDirectory("sourceDir")
+    if ( sourceDir.exists ) {
+      sourceDir.deleteRecursively
+    }
+    sourceDir.createDirectory(false)
+
+    val subDir: Directory = sourceDir / FileSupport.toDirectory("subDir")
+    subDir.createDirectory(false)
+
+    val someFile: File = subDir / FileSupport.toFile("someFile")
+    someFile.createFile(false)
+
+    val targetDir: Directory = target / FileSupport.toDirectory("targetDir")
+    if ( targetDir.exists ) {
+      targetDir.deleteRecursively
+    }
+
+    FileSupport.recursiveCopy(sourceDir, targetDir)
+
+    val listing = new ArrayBuffer[String]
+
+    targetDir.deepList().foreach(file => listing.append(file.toString))
+
+    listing should contain("./target/targetDir/subDir")
+    listing should contain("./target/targetDir/subDir/someFile")
+
+  }
+
+  
+}
\ No newline at end of file



Mime
View raw message