Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 35139 invoked from network); 7 Jul 2010 03:41:24 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:41:24 -0000 Received: (qmail 32089 invoked by uid 500); 7 Jul 2010 03:41:24 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 32029 invoked by uid 500); 7 Jul 2010 03:41:23 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31956 invoked by uid 99); 7 Jul 2010 03:41:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:41:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:41:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2A37F2388A64; Wed, 7 Jul 2010 03:40:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961068 [4/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache... Date: Wed, 07 Jul 2010 03:40:20 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707034021.2A37F2388A64@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.transport.vm + +import _root_.java.io.IOException +import _root_.java.net.URI +import _root_.java.util.concurrent.atomic.AtomicBoolean +import _root_.java.util.concurrent.atomic.AtomicInteger + +import _root_.org.apache.activemq.apollo.broker._ +import _root_.org.apache.activemq.transport.Transport +import _root_.org.apache.activemq.transport.TransportFactory +import _root_.org.apache.activemq.transport.TransportServer +import _root_.org.apache.activemq.transport.pipe.PipeTransport +import _root_.org.apache.activemq.transport.pipe.PipeTransportFactory +import _root_.org.apache.activemq.transport.pipe.PipeTransportServer +import _root_.org.apache.activemq.util.IOExceptionSupport +import _root_.org.apache.activemq.util.URISupport +import _root_.org.apache.activemq.transport.TransportFactorySupport.configure +import _root_.org.apache.activemq.transport.TransportFactorySupport.verify + +import _root_.scala.collection.JavaConversions._ + +object VMTransportFactory extends Log { + val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString(); +} + +/** + * Implements the vm transport which behaves like the pipe transport except that + * it can start embedded brokers up on demand. + * + * @author chirino + * + */ +class VMTransportFactory extends PipeTransportFactory with Logging { + + import PipeTransportFactory._ + import VMTransportFactory._ + override protected def log = VMTransportFactory + + /** + * This extension of the PipeTransportServer shuts down the broker + * when all the connections are disconnected. + * + * @author chirino + */ + class VmTransportServer extends PipeTransportServer { + + val refs = new AtomicInteger() + var broker:Broker = null + + override def createClientTransport():PipeTransport = { + refs.incrementAndGet(); + new PipeTransport(this) { + + val stopped = new AtomicBoolean() + + override def stop() = { + if( stopped.compareAndSet(false, true) ) { + super.stop(); + if( refs.decrementAndGet() == 0 ) { + stopBroker(); + } + } + } + }; + } + + def setBroker(broker:Broker) = { + this.broker = broker; + } + + def stopBroker() = { + try { + this.broker.stop(); + unbind(this); + } catch { + case e:Exception=> + error("Failed to stop the broker gracefully: "+e); + debug("Failed to stop the broker gracefully: ", e); + } + } + } + + override def bind(uri:URI):TransportServer = { + new VmTransportServer(); + } + + override def connect(location:URI):Transport = { + try { + + var brokerURI:String = null; + var create = true; + var name = location.getHost(); + if (name == null) { + name = DEFAULT_PIPE_NAME; + } + + var options = URISupport.parseParamters(location); + var config = options.remove("broker").asInstanceOf[String] + if (config != null) { + brokerURI = config; + } + if ("false".equals(options.remove("create"))) { + create = false; + } + + + var server = servers.get(name); + if (server == null && create) { + + // Create the broker on demand. + var broker = if( brokerURI == null ) { + new Broker() + } else { + BrokerFactory.createBroker(brokerURI); + } + + // Remove the existing pipe severs if the broker is configured with one... we want to make sure it + // uses the one we explicitly configure here. + for (s <- broker.transportServers ) { + if (s.isInstanceOf[PipeTransportServer] && name == s.asInstanceOf[PipeTransportServer].getName()) { + broker.transportServers.remove(s); + } + } + + // We want to use a vm transport server impl. + var vmTransportServer = TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null")).asInstanceOf[VmTransportServer] + vmTransportServer.setBroker(broker); + broker.transportServers.add(vmTransportServer); + broker.start(); + + server = servers.get(name); + } + + if (server == null) { + throw new IOException("Server is not bound: " + name); + } + + var transport = server.connect(); + verify( configure(transport, options), options); + + } catch { +// case e:URISyntaxException=> +// throw IOExceptionSupport.create(e); + case e:Exception=> + throw IOExceptionSupport.create(e); + } + } + +} Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,698 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker + +import _root_.java.beans.ExceptionListener +import _root_.java.io.{File} +import _root_.java.net.URI +import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import _root_.java.util.{LinkedHashMap, ArrayList, HashMap} +import _root_.org.apache.activemq.apollo.broker._ +import _root_.org.apache.activemq.broker.store.{StoreFactory, Store} +import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter} +import _root_.java.lang.{String} +import _root_.org.apache.activemq.util.buffer.{AsciiBuffer} +import _root_.org.junit.{Test, Before} + +import org.apache.activemq.transport.TransportFactory + +import _root_.scala.collection.JavaConversions._ + + +abstract class RemoteConsumer extends Connection { + + val consumerRate = new MetricCounter(); + var totalConsumerRate : MetricAggregator = null + var thinkTime:Long = 0 + var destination:Destination = null + var selector:String = null; + var durable = false; + var uri:URI = null + + private var schedualWait = false; + + override def start() = { + consumerRate.name("Consumer " + name + " Rate"); + totalConsumerRate.add(consumerRate); + transport = TransportFactory.connect(uri); + schedualWait = true; +// initialize(); + super.start(); + setupSubscription(); + } + + + protected def setupSubscription() + + protected def messageReceived( elem:MessageDelivery ) { +// TODO: +// if( schedualWait ) { +// if (thinkTime > 0) { +// dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, new Runnable(){ +// public void run() { +// consumerRate.increment(); +// controller.elementDispatched(elem); +// } +// }); +// +// } +// else +// { +// consumerRate.increment(); +// controller.elementDispatched(elem); +// } +// +// } else { +// if( thinkTime>0 ) { +// try { +// Thread.sleep(thinkTime); +// } catch (InterruptedException e) { +// } +// } +// consumerRate.increment(); +// controller.elementDispatched(elem); +// } + } + +} + + +abstract class RemoteProducer extends Connection { + + val rate = new MetricCounter(); + + var messageIdGenerator:AtomicLong = null + var priority = 0 + var persistentDelivery = false + var priorityMod = 0 + var counter = 0 + var producerId = 0 + var destination:Destination =null + var property:String = null + var totalProducerRate:MetricAggregator = null + var next:MessageDelivery = null + + var filler:String = null + var payloadSize = 20 + var uri:URI = null + +// TODO: +// protected IFlowController outboundController; +// protected IFlowSink outboundQueue; + + + override def start() = { + + if( payloadSize>0 ) { + var sb = new StringBuilder(payloadSize); + for( i <- 0 until payloadSize) { + sb.append(('a'+(i%26)).toChar); + } + filler = sb.toString(); + } + + rate.name("Producer " + name + " Rate"); + totalProducerRate.add(rate); + + + transport = TransportFactory.connect(uri); +// initialize(); + super.start(); + + setupProducer(); + + } + + def dispatch() = { +// TODO: +// while(true) +// { +// +// if(next == null) +// { +// createNextMessage(); +// } +// +// //If flow controlled stop until flow control is lifted. +// if(outboundController.isSinkBlocked()) +// { +// if(outboundController.addUnblockListener(this)) +// { +// return; +// } +// } +// +// outboundQueue.add(next, null); +// rate.increment(); +// next = null; +// } + } + + def setupProducer() + + def createNextMessage() + +// public void onFlowUnblocked(ISinkController controller) { +// dispatchQueue.dispatchAsync(dispatchTask); +// } + + def createPayload():String = { + if( payloadSize>=0 ) { + var sb = new StringBuilder(payloadSize); + sb.append(name); + sb.append(':'); + counter += 1 + sb.append(counter); + sb.append(':'); + var length = sb.length; + if( length <= payloadSize ) { + sb.append(filler.subSequence(0, payloadSize-length)); + return sb.toString(); + } else { + return sb.substring(0, payloadSize); + } + } else { + counter += 1 + return name+":"+(counter); + } + } + +} + +object BrokerTestBase { + + var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3")) + var IO_WORK_AMOUNT = 0 + var FANIN_COUNT = 10 + var FANOUT_COUNT = 10 + var PRIORITY_LEVELS = 10 + var USE_INPUT_QUEUES = true + + var USE_KAHA_DB = true; + var PURGE_STORE = true; + var PERSISTENT = false; + var DURABLE = false; + +} +abstract class BrokerTestBase { + import BrokerTestBase._ + + // Set to put senders and consumers on separate brokers. + protected var multibroker = false; + + // Set to mockup up ptp: + protected var ptp = false; + + // Set to use tcp IO + protected var tcp = true; + // set to force marshalling even in the NON tcp case. + protected var forceMarshalling = true; + + protected var sendBrokerBindURI:String=null + protected var receiveBrokerBindURI:String=null + protected var sendBrokerConnectURI:String=null + protected var receiveBrokerConnectURI:String=null + + protected var producerCount=0 + protected var consumerCount=0 + protected var destCount=0 + + protected val totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items") + protected val totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items") + + protected var sendBroker:Broker=null + protected var rcvBroker:Broker=null + protected val brokers = new ArrayList[Broker]() + protected val msgIdGenerator = new AtomicLong() + protected val stopping = new AtomicBoolean() + + val producers = new ArrayList[RemoteProducer]() + val consumers = new ArrayList[RemoteConsumer]() + var name:String=null; + + @Before + def setUp() = { + if (tcp) { + sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat(); + receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat(); + + sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat(); + receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat(); + } else { + sendBrokerConnectURI = "pipe://SendBroker"; + receiveBrokerConnectURI = "pipe://ReceiveBroker"; + if (forceMarshalling) { + sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat(); + receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat(); + } else { + sendBrokerBindURI = sendBrokerConnectURI; + receiveBrokerBindURI = receiveBrokerConnectURI; + } + } + } + + def setName(name:String) = { + if( this.name==null ) { + this.name = name; + } + } + + def getName() = name + + def getBrokerWireFormat() = "multi" + + def getRemoteWireFormat():String + + @Test + def benchmark_1_1_0():Unit = { + setName("1 producer -> 1 destination -> 0 consumers"); + if (ptp) { + return; + } + producerCount = 1; + destCount = 1; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_1_1_1() = { + setName("1 producer -> 1 destination -> 1 consumers"); + producerCount = 1; + destCount = 1; + consumerCount = 1; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_10_1_10() = { + setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT)); + producerCount = FANIN_COUNT; + consumerCount = FANOUT_COUNT; + destCount = 1; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_10_1_1() = { + setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT)); + producerCount = FANIN_COUNT; + destCount = 1; + consumerCount = 1; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_1_1_10() = { + setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT)); + producerCount = 1; + destCount = 1; + consumerCount = FANOUT_COUNT; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_2_2_2() = { + setName(format("2 producer -> 2 destination -> 2 consumers")); + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_10_10_10() = { + setName(format("10 producers -> 10 destinations -> 10 consumers")); + producerCount = 10; + destCount = 10; + consumerCount = 10; + + createConnections(); + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + /** + * Tests 2 producers sending to 1 destination with 2 consumres, but with + * consumers set to select only messages from each producer. 1 consumers is + * set to slow, the other producer should be able to send quickly. + * + * @throws Exception + */ + @Test + def benchmark_2_2_2_SlowConsumer() = { + setName(format("2 producer -> 2 destination -> 2 slow consumers")); + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + consumers.get(0).thinkTime = 50; + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + @Test + def benchmark_2_2_2_Selector()= { + setName(format("2 producer -> 2 destination -> 2 selector consumers")); + producerCount = 2; + destCount = 2; + consumerCount = 2; + + createConnections(); + + // Add properties to match producers to their consumers + for (i <- 0 until consumerCount) { + var property = "match" + i; + consumers.get(i).selector = property; + producers.get(i).property = property; + } + + // Start 'em up. + startClients(); + try { + reportRates(); + } finally { + stopServices(); + } + } + + /** + * Test sending with 1 high priority sender. The high priority sender should + * have higher throughput than the other low priority senders. + * + * @throws Exception + */ + @Test + def benchmark_2_1_1_HighPriorityProducer() = { + + setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer")); + producerCount = 2; + destCount = 1; + consumerCount = 1; + + createConnections(); + var producer = producers.get(0); + producer.priority = 1 + producer.rate.setName("High Priority Producer Rate"); + + consumers.get(0).thinkTime = 1; + + // Start 'em up. + startClients(); + try { + + System.out.println("Checking rates for test: " + getName()); + for (i <- 0 until PERFORMANCE_SAMPLES) { + var p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(producer.rate.getRateSummary(p)); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + + } finally { + stopServices(); + } + } + + /** + * Test sending with 1 high priority sender. The high priority sender should + * have higher throughput than the other low priority senders. + * + * @throws Exception + */ + @Test + def benchmark_2_1_1_MixedHighPriorityProducer() = { + + setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer")); + producerCount = 2; + destCount = 1; + consumerCount = 1; + + createConnections(); + var producer = producers.get(0); + producer.priority = 1; + producer.priorityMod = 3; + producer.rate.setName("High Priority Producer Rate"); + + consumers.get(0).thinkTime = 1 + + // Start 'em up. + startClients(); + try { + + System.out.println("Checking rates for test: " + getName()); + for (i <- 0 until PERFORMANCE_SAMPLES) { + var p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(producer.rate.getRateSummary(p)); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + + } finally { + stopServices(); + } + } + + def reportRates() = { + System.out.println("Checking rates for test: " + getName() + ", " +(if(ptp){"ptp"}else{"topic"}) ); + for (i <- 0 until PERFORMANCE_SAMPLES) { + var p = new Period(); + Thread.sleep(1000 * 5); + System.out.println(totalProducerRate.getRateSummary(p)); + System.out.println(totalConsumerRate.getRateSummary(p)); + totalProducerRate.reset(); + totalConsumerRate.reset(); + } + } + + def createConnections() = { + + if (multibroker) { + sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI); + rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI); + brokers.add(sendBroker); + brokers.add(rcvBroker); + } else { + sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI); + rcvBroker = sendBroker + brokers.add(sendBroker); + } + + startBrokers(); + + var dests = new Array[Destination](destCount); + + for (i <-0 until destCount) { + val domain = if (ptp) { Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN} + val name = new AsciiBuffer("dest" + (i + 1)) + var bean = new SingleDestination(domain, name) + dests(i) = bean; + if (ptp) { + sendBroker.defaultVirtualHost.createQueue(dests(i)); + if (multibroker) { + rcvBroker.defaultVirtualHost.createQueue(dests(i)); + } + } + } + + for (i <- 0 until producerCount) { + var destination = dests(i % destCount); + var producer = createProducer(i, destination); + producer.persistentDelivery = PERSISTENT; + producers.add(producer); + } + + for (i <- 0 until consumerCount) { + var destination = dests(i % destCount); + var consumer = createConsumer(i, destination); + consumer.durable = DURABLE; + consumers.add(consumer); + } + + // Create MultiBroker connections: + // if (multibroker) { + // Pipe pipe = new Pipe(); + // sendBroker.createBrokerConnection(rcvBroker, pipe); + // rcvBroker.createBrokerConnection(sendBroker, pipe.connect()); + // } + } + + def createConsumer(i:Int, destination:Destination):RemoteConsumer = { + + var consumer = createConsumer(); + consumer.exceptionListener= new ExceptionListener() { + def exceptionThrown(error:Exception ) = { + if (!stopping.get()) { + System.err.println("Consumer Async Error:"); + error.printStackTrace(); + } + } + } + + consumer.uri = new URI(rcvBroker.connectUris.head) + consumer.destination = destination + consumer.name = "consumer" + (i + 1) + consumer.totalConsumerRate = totalConsumerRate + return consumer; + } + + protected def createConsumer():RemoteConsumer + + private def createProducer(id:Int, destination:Destination):RemoteProducer = { + var producer = createProducer(); + producer.exceptionListener = new ExceptionListener() { + def exceptionThrown(error:Exception ) = { + if (!stopping.get()) { + System.err.println("Producer Async Error:"); + error.printStackTrace(); + } + } + } + producer.uri = new URI(sendBroker.connectUris.head) + producer.producerId = id + 1 + producer.name = "producer" + (id + 1) + producer.destination = destination + producer.messageIdGenerator = msgIdGenerator + producer.totalProducerRate = totalProducerRate + producer + } + + protected def createProducer():RemoteProducer + + private def createBroker(name:String , bindURI:String , connectUri:String ):Broker = { + val broker = new Broker() + broker.transportServers.add(TransportFactory.bind(new URI(bindURI))) + broker.connectUris.add(connectUri) +// TODO: +// broker.defaultVirtualHost.setStore(createStore(broker)) + broker + } + + protected def createStore(broker:Broker):Store = { + val store = if (USE_KAHA_DB) { + StoreFactory.createStore("kaha-db"); + } else { + StoreFactory.createStore("memory"); + } + store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name)); + store.setDeleteAllMessages(PURGE_STORE); + store + } + + private def stopServices() = { + stopping.set(true); + for (broker <- brokers) { + broker.stop(); + } + for (connection <- producers) { + connection.stop(); + } + for (connection <- consumers) { + connection.stop(); + } + } + + private def startBrokers() = { + for (broker <- brokers) { + broker.start(); + } + } + + private def startClients() = { + + for (connection <- consumers) { + connection.start(); + } + + for (connection <- producers) { + connection.start(); + } + } + +} Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java Wed Jul 7 03:40:18 2010 @@ -14,27 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker; +package org.apache.activemq.broker; -import java.util.Collection; - -import org.apache.activemq.apollo.broker.path.PathMap; +import java.beans.ExceptionListener; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.apollo.broker.Broker; +import org.apache.activemq.apollo.broker.Destination; +import org.apache.activemq.apollo.broker.Router; +import org.apache.activemq.broker.store.Store; +import org.apache.activemq.broker.store.StoreFactory; +import org.apache.activemq.metric.MetricAggregator; +import org.apache.activemq.metric.Period; +import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.util.buffer.AsciiBuffer; +import org.junit.Before; +import org.junit.Test; -public class Domain { - - private final PathMap targets = new PathMap(); - - synchronized public void bind(AsciiBuffer name, DeliveryTarget queue) { - targets.put(name, queue); - } - - synchronized public void unbind(AsciiBuffer name, DeliveryTarget queue) { - targets.remove(name, queue); - } - - synchronized public Collection route(AsciiBuffer name, MessageDelivery delivery) { - return targets.get(name); - } +import static java.lang.String.*; -} Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java?rev=961068&r1=961067&r2=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java Wed Jul 7 03:40:18 2010 @@ -32,31 +32,32 @@ import org.apache.activemq.util.URISuppo public class JAXBBrokerFactory implements BrokerFactory.Handler { - public Broker createBroker(URI brokerURI) throws Exception { - JAXBContext context = JAXBContext.newInstance("org.apache.activemq.apollo.jaxb"); - Unmarshaller unmarshaller = context.createUnmarshaller(); + public Broker createBroker(String value) { + try { + URI brokerURI = new URI(value); + JAXBContext context = JAXBContext.newInstance("org.apache.activemq.apollo.jaxb"); + Unmarshaller unmarshaller = context.createUnmarshaller(); - URL configURL; - brokerURI = URISupport.stripScheme(brokerURI); - String scheme = brokerURI.getScheme(); - if( scheme==null || "file".equals(scheme) ) { - configURL = URISupport.changeScheme(URISupport.stripScheme(brokerURI), "file").toURL(); - } else if( "classpath".equals(scheme) ) { - configURL = Thread.currentThread().getContextClassLoader().getResource(brokerURI.getSchemeSpecificPart()); - } else { - configURL = URISupport.changeScheme(brokerURI, scheme).toURL(); - } - if (configURL == null) { - throw new IOException("Cannot create broker from non-existent URI: " + brokerURI); - } - XMLInputFactory factory = XMLInputFactory.newInstance(); - XMLStreamReader reader = factory.createXMLStreamReader(configURL.openStream()); - XMLStreamReader properties = new PropertiesReader(reader); - try { + URL configURL; + brokerURI = URISupport.stripScheme(brokerURI); + String scheme = brokerURI.getScheme(); + if( scheme==null || "file".equals(scheme) ) { + configURL = URISupport.changeScheme(URISupport.stripScheme(brokerURI), "file").toURL(); + } else if( "classpath".equals(scheme) ) { + configURL = Thread.currentThread().getContextClassLoader().getResource(brokerURI.getSchemeSpecificPart()); + } else { + configURL = URISupport.changeScheme(brokerURI, scheme).toURL(); + } + if (configURL == null) { + throw new IOException("Cannot create broker from non-existent URI: " + brokerURI); + } + XMLInputFactory factory = XMLInputFactory.newInstance(); + XMLStreamReader reader = factory.createXMLStreamReader(configURL.openStream()); + XMLStreamReader properties = new PropertiesReader(reader); BrokerXml xml = (BrokerXml) unmarshaller.unmarshal(properties); return xml.createMessageBroker(); - } catch (UnmarshalException e) { - throw new IOException("Cannot create broker from URI: " + brokerURI + ", reason: " + e.getCause()); + } catch (Exception e) { + throw new RuntimeException("Cannot create broker from URI: " + value, e); } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java?rev=961068&r1=961067&r2=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java Wed Jul 7 03:40:18 2010 @@ -28,8 +28,7 @@ import org.apache.activemq.broker.store. public class MemoryStoreXml extends StoreXml { public Store createStore() { - MemoryStore rc = new MemoryStore(); - return rc; + return new MemoryStore(); } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java?rev=961068&r1=961067&r2=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java Wed Jul 7 03:40:18 2010 @@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlTran import javax.xml.bind.annotation.adapters.XmlAdapter; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.activemq.apollo.broker.BrokerDatabase; import org.apache.activemq.apollo.broker.VirtualHost; import org.apache.activemq.util.buffer.AsciiBuffer; @@ -34,19 +35,20 @@ import org.apache.activemq.util.buffer.A @XmlAccessorType(XmlAccessType.FIELD) public class VirtualHostXml { - @XmlJavaTypeAdapter(AsciiBufferAdapter.class) @XmlElement(name="host-name", required=true) - private ArrayList hostNames = new ArrayList(); + private ArrayList hostNames = new ArrayList(); @XmlElementRef private StoreXml store; public VirtualHost createVirtualHost(BrokerXml brokerXml) throws Exception { VirtualHost rc = new VirtualHost(); - rc.setHostNames(hostNames); - + rc.setNamesArray(hostNames); if( store != null ) { - rc.setStore(store.createStore()); + BrokerDatabase database = new BrokerDatabase(); + database.setVirtualHost(rc); + database.setStore(store.createStore()); + rc.setDatabase(database); } return rc; } Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=961068&r1=961067&r2=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Wed Jul 7 03:40:18 2010 @@ -39,66 +39,59 @@ public class JAXBConfigTest extends Test @Test() public void testSimpleConfig() throws Exception { - URI uri = new URI("jaxb:classpath:org/apache/activemq/apollo/jaxb/testSimpleConfig.xml"); + String uri = "jaxb:classpath:org/apache/activemq/apollo/jaxb/testSimpleConfig.xml"; LOG.info("Loading broker configuration from the classpath with URI: " + uri); - Broker broker = BrokerFactory.createBroker(uri); + Broker broker = BrokerFactory.createBroker(uri, false); // assertEquals(4, p.getSize()); // assertEquals("test dispatcher", p.getName()); - assertEquals(1, broker.getTransportServers().size()); + assertEquals(1, broker.transportServers().size()); ArrayList expected = new ArrayList(); expected.add("pipe://test1"); expected.add("tcp://127.0.0.1:61616"); - assertEquals(expected, broker.getConnectUris()); + assertEquals(expected, broker.connectUris() ); - assertEquals(2, broker.getVirtualHosts().size()); + assertEquals(2, broker.virtualHosts().size()); - assertNotNull(broker.getDefaultVirtualHost().getDatabase()); - assertNotNull(broker.getDefaultVirtualHost().getDatabase().getStore()); - assertTrue((broker.getDefaultVirtualHost().getDatabase().getStore() instanceof MemoryStore)); + assertNotNull(broker.defaultVirtualHost().getDatabase()); + assertNotNull(broker.defaultVirtualHost().getDatabase().getStore()); + assertTrue((broker.defaultVirtualHost().getDatabase().getStore() instanceof MemoryStore)); } @Test() public void testUris() throws Exception { - boolean failed = false; - // non-existent classpath + + // non-existent classpath try { - URI uri = new URI("jaxb:classpath:org/apache/activemq/apollo/jaxb/testUris-fail.xml"); - BrokerFactory.createBroker(uri); - } catch (IOException e) { - failed = true; + String uri = "jaxb:classpath:org/apache/activemq/apollo/jaxb/testUris-fail.xml"; + BrokerFactory.createBroker(uri, false); + fail("Creating broker from non-existing url does not throw an exception!"); + } catch (RuntimeException e) { } - if (!failed) { - fail("Creating broker from non-existing url does not throw an exception!"); - } - failed = false; - //non-existent file + + //non-existent file try { - URI uri = new URI("jaxb:file:/org/apache/activemq/apollo/jaxb/testUris-fail.xml"); - BrokerFactory.createBroker(uri); - } catch (IOException e) { - failed = true; - } - if (!failed) { - fail("Creating broker from non-existing url does not throw an exception!"); + String uri ="jaxb:file:/org/apache/activemq/apollo/jaxb/testUris-fail.xml"; + BrokerFactory.createBroker(uri, false); + fail("Creating broker from non-existing url does not throw an exception!"); + } catch (RuntimeException e) { } - //non-existent url + + //non-existent url try { - URI uri = new URI("jaxb:http://localhost/testUris.xml"); - BrokerFactory.createBroker(uri); - } catch (IOException e) { - failed = true; + String uri = "jaxb:http://localhost/testUris.xml"; + BrokerFactory.createBroker(uri, false); + fail("Creating broker from non-existing url does not throw an exception!"); + } catch (RuntimeException e) { } - if (!failed) { - fail("Creating broker from non-existing url does not throw an exception!"); - } + // regular file - URI uri = new URI("jaxb:" + Thread.currentThread().getContextClassLoader().getResource("org/apache/activemq/apollo/jaxb/testUris.xml")); - BrokerFactory.createBroker(uri); + String uri = "jaxb:" + Thread.currentThread().getContextClassLoader().getResource("org/apache/activemq/apollo/jaxb/testUris.xml"); + BrokerFactory.createBroker(uri, false); } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961068&r1=961067&r2=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul 7 03:40:18 2010 @@ -33,7 +33,7 @@ import org.apache.activemq.util.URISuppo */ public class PipeTransportFactory implements TransportFactory.TransportFactorySPI { - static protected final HashMap servers = new HashMap(); + public static final HashMap servers = new HashMap(); public TransportServer bind(URI uri) throws URISyntaxException, IOException {