From commits-return-13991-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Jul 07 03:41:26 2010 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 35182 invoked from network); 7 Jul 2010 03:41:25 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:41:25 -0000 Received: (qmail 32223 invoked by uid 500); 7 Jul 2010 03:41:25 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 32194 invoked by uid 500); 7 Jul 2010 03:41:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 32169 invoked by uid 99); 7 Jul 2010 03:41:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:41:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:41:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1BD7F23888D1; Wed, 7 Jul 2010 03:40:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961068 [1/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache... Date: Wed, 07 Jul 2010 03:40:20 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707034021.1BD7F23888D1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 03:40:18 2010 New Revision: 961068 URL: http://svn.apache.org/viewvc?rev=961068&view=rev Log: converting broker module to be scala based Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java - copied, changed from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Removed: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerAware.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerFactory.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandlerFactory.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/WildcardQueueSubscription.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteConsumer.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961068&r1=961067&r2=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul 7 03:40:18 2010 @@ -35,11 +35,26 @@ org.fusesource.hawtdispatch - hawtdispatch + hawtdispatch-scala ${hawtdispatch-version} + org.scala-lang + scala-library + compile + ${scala-version} + + + + org.scala-lang + scala-compiler + ${scala-version} + compile + true + + + org.apache.activemq activemq-transport @@ -78,6 +93,10 @@ + install + src/main/scala + src/test/scala + target/schema @@ -108,6 +127,43 @@ + + org.scala-tools + maven-scala-plugin + 2.13.1 + + + + compile + testCompile + + + + + + -Xmx1024m + + + -deprecation + -Xno-varargs-conversion + + ${scala-version} + + + + + maven-surefire-plugin + 2.4.3 + + + false + + false + true + false + + + Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.util.concurrent.atomic.AtomicLong +import _root_.org.apache.activemq.util.buffer._ +import _root_.org.fusesource.hawtdispatch._ +import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ + +import java.util.HashMap +import collection.JavaConversions +import path.PathMap + +object Domain { + val TOPIC_DOMAIN = new AsciiBuffer("topic"); + val QUEUE_DOMAIN = new AsciiBuffer("queue"); + val TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic"); + val TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue"); +} + +import Domain._ +class Domain { + + val targets = new PathMap[DeliveryTarget](); + + def bind(name:AsciiBuffer, queue:DeliveryTarget) = { + targets.put(name, queue); + } + + def unbind(name:AsciiBuffer, queue:DeliveryTarget) = { + targets.remove(name, queue); + } + +// +// synchronized public Collection route(AsciiBuffer name, MessageDelivery delivery) { +// return targets.get(name); +// } + +} + + +/** + * Provides a non-blocking concurrent producer to consumer + * routing implementation. + * + * DeliveryProducers create a route object for each destination + * they will be producing to. Once the route is + * connected to the router, the producer can use + * the route.targets list without synchronization to + * get the current set of consumers that are bound + * to the destination. + * + */ +class Router(var queue:DispatchQueue) { + + trait DestinationNode { + var targets = List[DeliveryTarget]() + var routes = List[DeliveryProducerRoute]() + + def on_bind(x:List[DeliveryTarget]):Unit + def on_unbind(x:List[DeliveryTarget]):Boolean + def on_connect(route:DeliveryProducerRoute):Unit + def on_disconnect(route:DeliveryProducerRoute):Boolean = { + routes = routes.filterNot({r=> route==r}) + route.disconnected() + routes == Nil && targets == Nil + } + } + + class TopicDestinationNode extends DestinationNode { + def on_bind(x:List[DeliveryTarget]) = { + targets = x ::: targets + routes.foreach({r=> + r.bind(x) + }) + } + + def on_unbind(x:List[DeliveryTarget]):Boolean = { + targets = targets.filterNot({t=>x.contains(t)}) + routes.foreach({r=> + r.unbind(x) + }) + routes == Nil && targets == Nil + } + + def on_connect(route:DeliveryProducerRoute) = { + routes = route :: routes + route.connected(targets) + } + } + + class QueueDestinationNode(destination:Destination) extends DestinationNode { + val queue = new Queue(destination) + + def on_bind(x:List[DeliveryTarget]) = { + targets = x ::: targets + queue.bind(x) + } + + def on_unbind(x:List[DeliveryTarget]):Boolean = { + targets = targets.filterNot({t=>x.contains(t)}) + queue.unbind(x) + routes == Nil && targets == Nil + } + + def on_connect(route:DeliveryProducerRoute) = { + routes = route :: routes + route.connected(queue :: Nil) + } + } + + var destinations = new HashMap[Destination, DestinationNode]() + + private def get(destination:Destination):DestinationNode = { + var result = destinations.get(destination) + if( result ==null ) { + if( isTopic(destination) ) { + result = new TopicDestinationNode + } else { + result = new QueueDestinationNode(destination) + } + destinations.put(destination, result) + } + result + } + + def bind(destination:Destination, targets:List[DeliveryTarget]) = retaining(targets) { + get(destination).on_bind(targets) + } ->: queue + + def unbind(destination:Destination, targets:List[DeliveryTarget]) = releasing(targets) { + if( get(destination).on_unbind(targets) ) { + destinations.remove(destination) + } + } ->: queue + + def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = { + val route = new DeliveryProducerRoute(destination, routeQueue, producer) { + override def on_connected = { + completed(this); + } + } + ^ { + get(destination).on_connect(route) + } ->: queue + } + + def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN + def isQueue(destination:Destination) = !isTopic(destination) + + def disconnect(route:DeliveryProducerRoute) = releasing(route) { + get(route.destination).on_disconnect(route) + } ->: queue + + + def each(proc:(Destination, DestinationNode)=>Unit) = { + import JavaConversions._; + for( (destination, node) <- destinations ) { + proc(destination, node) + } + } + +} + +trait Route extends Retained { + + val destination:Destination + val queue:DispatchQueue + val metric = new AtomicLong(); + + def connected(targets:List[DeliveryTarget]):Unit + def bind(targets:List[DeliveryTarget]):Unit + def unbind(targets:List[DeliveryTarget]):Unit + def disconnected():Unit + +} + +class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route { + + + // Retain the queue while we are retained. + queue.retain + setDisposer(^{ + queue.release + }) + + var targets = List[DeliveryTargetSession]() + + def connected(targets:List[DeliveryTarget]) = retaining(targets) { + internal_bind(targets) + on_connected + } ->: queue + + def bind(targets:List[DeliveryTarget]) = retaining(targets) { + internal_bind(targets) + } ->: queue + + private def internal_bind(values:List[DeliveryTarget]) = { + values.foreach{ x=> + targets = x.open_session(queue) :: targets + } + } + + def unbind(targets:List[DeliveryTarget]) = releasing(targets) { + this.targets = this.targets.filterNot { x=> + val rc = targets.contains(x.consumer) + if( rc ) { + x.close + } + rc + } + } ->: queue + + def disconnected() = ^ { + this.targets.foreach { x=> + x.close + x.consumer.release + } + } ->: queue + + protected def on_connected = {} + protected def on_disconnected = {} + +} Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.beans.ExceptionListener +import _root_.java.io.{IOException} +import _root_.java.util.{LinkedHashMap, HashMap} +import _root_.org.apache.activemq.filter.{BooleanExpression} +import _root_.org.apache.activemq.transport._ +import _root_.org.apache.activemq.Service +import _root_.java.lang.{String} +import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport} +import _root_.org.apache.activemq.wireformat.WireFormat +import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ +class ConnectionConfig { + +} +abstract class Connection() extends TransportListener with Service { + + val q = createQueue("connection") + var name = "connection" + var stopping = false; + + var transport:Transport = null + var exceptionListener:ExceptionListener = null; + + def start() = { + transport.setDispatchQueue(q); + transport.getDispatchQueue.release + transport.setTransportListener(this); + transport.start() + } + + def stop() = { + stopping=true + transport.stop() + q.release + } + + def onException(error:IOException) = { + if (!stopping) { + onFailure(error); + } + } + + def onFailure(error:Exception) = { + if (exceptionListener != null) { + exceptionListener.exceptionThrown(error); + } + } + + def onDisconnected() = { + } + + def onConnected() = { + } + +} + +object BrokerConnection extends Log + +class BrokerConnection(val broker: Broker) extends Connection with Logging { + override protected def log = BrokerConnection + + var protocolHandler: ProtocolHandler = null; + + exceptionListener = new ExceptionListener() { + def exceptionThrown(error:Exception) = { + info("Transport failed before messaging protocol was initialized.", error); + stop() + } + } + + + def onCommand(command: Object) = { + if (protocolHandler != null) { + protocolHandler.onCommand(command); + } else { + try { + var wireformat:WireFormat = null; + + if (command.isInstanceOf[WireFormat]) { + + // First command might be from the wire format decriminator, letting + // us know what the actually wireformat is. + wireformat = command.asInstanceOf[WireFormat]; + + try { + protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName()); + } catch { + case e:Exception=> + throw IOExceptionSupport.create("No protocol handler available for: " + wireformat.getName(), e); + } + + protocolHandler.setConnection(this); + protocolHandler.setWireFormat(wireformat); + protocolHandler.start(); + + exceptionListener = new ExceptionListener() { + def exceptionThrown(error:Exception) { + protocolHandler.onException(error); + } + } + protocolHandler.onCommand(command); + + } else { + throw new IOException("First command should be a WireFormat"); + } + + } catch { + case e:Exception => + onFailure(e); + } + } + } + + override def stop() = { + super.stop(); + if (protocolHandler != null) { + protocolHandler.stop(); + } + } +} + + +object ProtocolHandlerFactory { + val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/protocol/"); + + def createProtocolHandler(protocol:String) = { + PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler] + } +} + +trait ProtocolHandler extends Service { + + def onCommand(command:Any); + def setConnection(brokerConnection:BrokerConnection); + def setWireFormat(wireformat:WireFormat); + def onException(error:Exception); + +// TODO: +// public void setConnection(BrokerConnection connection); +// +// public BrokerConnection getConnection(); +// +// public void onCommand(Object command); +// +// public void onException(Exception error); +// +// public void setWireFormat(WireFormat wf); +// +// public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException; +// +// /** +// * ClientContext +// *

+// * Description: Base interface describing a channel on a physical +// * connection. +// *

+// * +// * @author cmacnaug +// * @version 1.0 +// */ +// public interface ClientContext { +// public ClientContext getParent(); +// +// public Collection getChildren(); +// +// public void addChild(ClientContext context); +// +// public void removeChild(ClientContext context); +// +// public void close(); +// +// } +// +// public abstract class AbstractClientContext extends AbstractLimitedFlowResource implements ClientContext { +// protected final HashSet children = new HashSet(); +// protected final ClientContext parent; +// protected boolean closed = false; +// +// public AbstractClientContext(String name, ClientContext parent) { +// super(name); +// this.parent = parent; +// if (parent != null) { +// parent.addChild(this); +// } +// } +// +// public ClientContext getParent() { +// return parent; +// } +// +// public void addChild(ClientContext child) { +// if (!closed) { +// children.add(child); +// } +// } +// +// public void removeChild(ClientContext child) { +// if (!closed) { +// children.remove(child); +// } +// } +// +// public Collection getChildren() { +// return children; +// } +// +// public void close() { +// +// closed = true; +// +// for (ClientContext c : children) { +// c.close(); +// } +// +// if (parent != null) { +// parent.removeChild(this); +// } +// +// super.close(); +// } +// } +// +} + +trait ConsumerContext { // extends ClientContext, Subscription, IFlowSink { + + def getConsumerId() : String + + def getDestination(): Destination + + def getSelector() : String + + def getSelectorExpression() : BooleanExpression + + def isDurable() : Boolean + + def getSubscriptionName() : String + + /** + * If the destination does not exist, should it automatically be + * created? + * + * @return + */ + def autoCreateDestination():Boolean + + def isPersistent() : Boolean + +} + Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,568 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.util.{LinkedList, LinkedHashMap, HashMap} +import _root_.org.apache.activemq.filter.{MessageEvaluationContext} +import _root_.java.lang.{String} +import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer} +import _root_.org.fusesource.hawtdispatch._ +import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ + +trait DeliveryProducer { + def collocate(queue:DispatchQueue):Unit +} + +trait DeliveryTargetSession { + val consumer:DeliveryTarget + def deliver(delivery:MessageDelivery) + def close:Unit +} +trait DeliveryTarget extends Retained { + def matches(message:MessageDelivery) + val queue:DispatchQueue; + def open_session(producer_queue:DispatchQueue):DeliveryTargetSession +} + +/** + * Abstracts wire protocol message implementations. Each wire protocol + * will provide it's own type of Message. + */ +trait Message { + + /** + * the globally unique id of the message + */ + def id: AsciiBuffer + + /** + * the globally unique id of the producer + */ + def producer: AsciiBuffer + + /** + * the message priority. + */ + def priority:Byte + + /** + * a positive value indicates that the delivery has an expiration + * time. + */ + def expiration: Long + + /** + * true if the delivery is persistent + */ + def persistent: Boolean + + /** + * where the message was sent to. + */ + def destination: Destination + + /** + * used to apply a selector against the message. + */ + def messageEvaluationContext:MessageEvaluationContext + +} + +object MessageDelivery { + def apply(o:MessageDelivery) = new MessageDelivery(o.message, o.encoded, o.encoding, o.size, o.ack, o.tx_id, o.store_id) +} + +case class MessageDelivery ( + + /** + * the message being delivered + */ + message: Message, + + /** + * the encoded form of the message being delivered. + */ + encoded: Buffer, + + /** + * the encoding format of the message + */ + encoding: String, + + /** + * memory size of the delivery. Used for resource allocation tracking + */ + size:Int, + + /** + * true if this delivery requires acknowledgment. + */ + ack:Boolean, + + /** + * The id used to identify the transaction that the message + * belongs to. + */ + tx_id:Long, + + /** + * The id used to identify this message in the message + * store. + * + * @return The store tracking or -1 if not set. + */ + store_id: Long + +) extends BaseRetained { + +} + +//abstract class BrokerMessageDelivery extends MessageDelivery { +// TODO: +// // True while the message is being dispatched to the delivery targets: +// boolean dispatching = false; +// +// // A non null pending save indicates that the message is the +// // saver queue and that the message +// OperationContext pendingSave; +// +// // List of persistent targets for which the message should be saved +// // when dispatch is complete: +// HashMap> persistentTargets; +// SaveableQueueElement singleTarget; +// +// long storeTracking = -1; +// BrokerDatabase store; +// boolean fromStore = false; +// boolean enableFlushDelay = true; +// private int limiterSize = -1; +// private long tid=-1; +// +// public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) { +// fromStore = true; +// store = database; +// storeTracking = mRecord.getKey(); +// limiterSize = mRecord.getSize(); +// } +// +// public final int getFlowLimiterSize() { +// if (limiterSize == -1) { +// limiterSize = getMemorySize(); +// } +// return limiterSize; +// } +// +// /** +// * When an application wishes to include a message in a broker transaction +// * it must set this the tid returned by {@link Transaction#getTid()} +// * +// * @param tid +// * Sets the tid used to identify the transaction at the broker. +// */ +// public void setTransactionId(long tid) { +// this.tid = tid; +// } +// +// /** +// * @return The tid used to identify the transaction at the broker. +// */ +// public final long getTransactionId() { +// return tid; +// } +// +// public final void clearTransactionId() { +// tid = -1; +// } +// +// /** +// * Subclass must implement this to return their current memory size +// * estimate. +// * +// * @return The memory size of the message. +// */ +// public abstract int getMemorySize(); +// +// public final boolean isFromStore() { +// return fromStore; +// } +// +// public final void persist(SaveableQueueElement sqe, ISourceController controller, boolean delayable) { +// synchronized (this) { +// // Can flush of this message to the store be delayed? +// if (enableFlushDelay && !delayable) { +// enableFlushDelay = false; +// } +// // If this message is being dispatched then add the queue to the +// // list of queues for which to save the message when dispatch is +// // finished: +// if (dispatching) { +// addPersistentTarget(sqe); +// return; +// } +// // Otherwise, if it is still in the saver queue, we can add this +// // queue to the queue list: +// else if (pendingSave != null) { +// addPersistentTarget(sqe); +// if (!delayable) { +// pendingSave.requestFlush(); +// } +// return; +// } +// } +// +// store.saveMessage(sqe, controller, delayable); +// } +// +// public final void acknowledge(SaveableQueueElement sqe) { +// boolean firePersistListener = false; +// boolean deleted = false; +// synchronized (this) { +// // If the message hasn't been saved to the database +// // then we don't need to issue a delete: +// if (dispatching || pendingSave != null) { +// +// deleted = true; +// +// removePersistentTarget(sqe.getQueueDescriptor()); +// // We get a save context when we place the message in the +// // database queue. If it has been added to the queue, +// // and we've removed the last queue, see if we can cancel +// // the save: +// if (pendingSave != null && !hasPersistentTargets()) { +// if (pendingSave.cancel()) { +// pendingSave = null; +// if (isPersistent()) { +// firePersistListener = true; +// } +// } +// } +// } +// } +// +// if (!deleted) { +// store.deleteQueueElement(sqe); +// } +// +// if (firePersistListener) { +// onMessagePersisted(); +// } +// +// } +// +// public final void setStoreTracking(long tracking) { +// if (storeTracking == -1) { +// storeTracking = tracking; +// } +// } +// +// public final void beginDispatch(BrokerDatabase database) { +// this.store = database; +// dispatching = true; +// setStoreTracking(database.allocateStoreTracking()); +// } +// +// public long getStoreTracking() { +// return storeTracking; +// } +// +// public synchronized Collection> getPersistentQueues() { +// if (singleTarget != null) { +// ArrayList> list = new ArrayList>(1); +// list.add(singleTarget); +// return list; +// } else if (persistentTargets != null) { +// return persistentTargets.values(); +// } +// return null; +// } +// +// public void beginStore() { +// synchronized (this) { +// pendingSave = null; +// } +// } +// +// private final boolean hasPersistentTargets() { +// return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null; +// } +// +// private final void removePersistentTarget(QueueDescriptor queue) { +// if (persistentTargets != null) { +// persistentTargets.remove(queue); +// return; +// } +// +// if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) { +// singleTarget = null; +// } +// } +// +// private final void addPersistentTarget(SaveableQueueElement elem) { +// if (persistentTargets != null) { +// persistentTargets.put(elem.getQueueDescriptor(), elem); +// return; +// } +// +// if (singleTarget == null) { +// singleTarget = elem; +// return; +// } +// +// if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) { +// persistentTargets = new HashMap>(); +// persistentTargets.put(elem.getQueueDescriptor(), elem); +// persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget); +// singleTarget = null; +// } +// } +// +// public final void finishDispatch(ISourceController controller) throws IOException { +// boolean firePersistListener = false; +// synchronized (this) { +// // If any of the targets requested save then save the message +// // Note that this could be the case even if the message isn't +// // persistent if a target requested that the message be spooled +// // for some other reason such as queue memory overflow. +// if (hasPersistentTargets()) { +// pendingSave = store.persistReceivedMessage(this, controller); +// } +// +// // If none of the targets required persistence, then fire the +// // persist listener: +// if (pendingSave == null || !isPersistent()) { +// firePersistListener = true; +// } +// dispatching = false; +// } +// +// if (firePersistListener) { +// onMessagePersisted(); +// } +// } +// +// public final MessageRecord createMessageRecord() { +// +// MessageRecord record = new MessageRecord(); +// record.setEncoding(getStoreEncoding()); +// record.setBuffer(getStoreEncoded()); +// record.setStreamKey((long) 0); +// record.setMessageId(getMsgId()); +// record.setSize(getFlowLimiterSize()); +// record.setKey(getStoreTracking()); +// return record; +// } +// +// /** +// * @return A buffer representation of the message to be stored in the store. +// * @throws +// */ +// protected abstract Buffer getStoreEncoded(); +// +// /** +// * @return The encoding scheme used to store the message. +// */ +// protected abstract AsciiBuffer getStoreEncoding(); +// +// public boolean isFlushDelayable() { +// // TODO Auto-generated method stub +// return enableFlushDelay; +// } +//} + +/** + * + * @author Hiram Chirino + */ +class DeliveryBuffer(var maxSize:Int=1024*32) { + + var deliveries = new LinkedList[MessageDelivery]() + private var size = 0 + var eventHandler: Runnable = null + + def full = size >= maxSize + + def drain = eventHandler.run + + def receive = deliveries.poll + + def isEmpty = deliveries.isEmpty + + def send(delivery:MessageDelivery):Unit = { + delivery.retain + size += delivery.size + deliveries.addLast(delivery) + if( deliveries.size == 1 ) { + drain + } + } + + def ack(delivery:MessageDelivery) = { + // When a message is delivered to the consumer, we release + // used capacity in the outbound queue, and can drain the inbound + // queue + val wasBlocking = full + size -= delivery.size + delivery.release + if( !isEmpty ) { + drain + } + } + +} + +class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) { + + private var overflow = new LinkedList[MessageDelivery]() + + protected def drainOverflow:Unit = { + while( !overflow.isEmpty && !full ) { + val delivery = overflow.removeFirst + delivery.release + send_to_delivery_queue(delivery) + } + } + + def send(delivery:MessageDelivery) = { + if( full ) { + // Deliveries in the overflow queue is remain acquired by us so that + // producer that sent it to us gets flow controlled. + delivery.retain + overflow.addLast(delivery) + } else { + send_to_delivery_queue(delivery) + } + } + + protected def send_to_delivery_queue(value:MessageDelivery) = { + var delivery = MessageDelivery(value) + delivery.setDisposer(^{ + drainOverflow + }) + delivery_buffer.send(delivery) + delivery.release + } + + def full = delivery_buffer.full + +} + +class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained { + + var sessions = List[CreditServer]() + + var session_min_credits = 1024*4; + var session_credit_capacity = 1024*32 + var session_max_credits = session_credit_capacity; + + queue.retain + setDisposer(^{ + source.release + queue.release + }) + + // use a event aggregating source to coalesce multiple events from the same thread. + val source = createSource(new ListEventAggregator[MessageDelivery](), queue) + source.setEventHandler(^{drain_source}); + source.resume + + def drain_source = { + val deliveries = source.getData + deliveries.foreach { delivery=> + delivery_buffer.send(delivery) + delivery.release + } + } + + + class CreditServer(val producer_queue:DispatchQueue) { + private var _capacity = 0 + + def capacity(value:Int) = { + val change = value - _capacity; + _capacity = value; + client.credit(change) + } + + def drain(callback:Runnable) = { + client.drain(callback) + } + + val client = new CreditClient() + + class CreditClient() extends DeliveryOverflowBuffer(delivery_buffer) { + + producer_queue.retain + val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue) + credit_adder.setEventHandler(^{ + internal_credit(credit_adder.getData.intValue) + }); + credit_adder.resume + + private var credits = 0; + + /////////////////////////////////////////////////// + // These methods get called from the client/producer thread... + /////////////////////////////////////////////////// + def close = { + credit_adder.release + producer_queue.release + } + + override def full = credits <= 0 + + override protected def send_to_delivery_queue(value:MessageDelivery) = { + var delivery = MessageDelivery(value) + delivery.setDisposer(^{ + // This is called from the server/consumer thread + credit_adder.merge(delivery.size); + }) + internal_credit(-delivery.size) + source.merge(delivery) + } + + def internal_credit(value:Int) = { + credits += value; + if( credits <= 0 ) { + credits = 0 + } else { + drainOverflow + } + } + + /////////////////////////////////////////////////// + // These methods get called from the server/consumer thread... + /////////////////////////////////////////////////// + def credit(value:Int) = ^{ internal_credit(value) } ->: producer_queue + + def drain(callback:Runnable) = { + credits = 0 + if( callback!=null ) { + queue << callback + } + } + } + } + + def session(queue:DispatchQueue) = { + val session = new CreditServer(queue) + sessions = session :: sessions + session.capacity(session_max_credits) + session.client + } + + +} Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.util.{LinkedHashMap, HashMap} +import _root_.org.apache.activemq.util.buffer.{AsciiBuffer} +import BufferConversions._ + +class ParserOptions { + var defaultDomain:AsciiBuffer = null + var queuePrefix:AsciiBuffer = null + var topicPrefix:AsciiBuffer = null + var tempQueuePrefix:AsciiBuffer = null + var tempTopicPrefix:AsciiBuffer = null +} + +trait Destination { + def getDomain(): AsciiBuffer + def getName(): AsciiBuffer + def getDestinations():Seq[Destination] +} + +object Destination { + + /** + * Parses a simple destination. + * + * @param value + * @param options + * @return + */ + def parse(value:AsciiBuffer, options:ParserOptions ):Destination = { + if (options.queuePrefix!=null && value.startsWith(options.queuePrefix)) { + var name = value.slice(options.queuePrefix.length, value.length).ascii(); + return new SingleDestination(Domain.QUEUE_DOMAIN, name); + } else if (options.topicPrefix!=null && value.startsWith(options.topicPrefix)) { + var name = value.slice(options.topicPrefix.length, value.length).ascii(); + return new SingleDestination(Domain.TOPIC_DOMAIN, name); + } else if (options.tempQueuePrefix!=null && value.startsWith(options.tempQueuePrefix)) { + var name = value.slice(options.tempQueuePrefix.length, value.length).ascii(); + return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name); + } else if (options.tempTopicPrefix!=null && value.startsWith(options.tempTopicPrefix)) { + var name = value.slice(options.tempTopicPrefix.length, value.length).ascii(); + return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name); + } else { + if( options.defaultDomain==null ) { + throw new IllegalArgumentException("Destination domain not provided: "+value); + } + return new SingleDestination(options.defaultDomain, value); + } + } + + /** + * Parses a destination which may or may not be a composite. + * + * @param value + * @param options + * @param compositeSeparator + * @return + */ + def parse(value:AsciiBuffer, options:ParserOptions , compositeSeparator:Byte ):Destination = { + if( value == null ) { + return null; + } + + if( value.contains(compositeSeparator) ) { + var rc = value.split(compositeSeparator); + var md = new MultiDestination(); + for (buffer <- rc) { + md.destinations ::= parse(buffer, options) + } + return md; + } + return parse(value, options); + } +} + +class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination { + + def getDestinations():Seq[Destination] = null; + def getDomain():AsciiBuffer = domain + def getName():AsciiBuffer = name + + override def toString() = ""+domain+":"+name +} + +class MultiDestination(var destinations:List[Destination]=Nil) extends Destination { + + def getDestinations():Seq[Destination] = destinations; + def getDomain():AsciiBuffer = null + def getName():AsciiBuffer = null + + override def toString() = destinations.mkString(",") +} Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.util.{LinkedHashMap, HashMap} +import _root_.java.lang.{Throwable, String} +import _root_.org.apache.commons.logging.LogFactory +import _root_.org.apache.commons.logging.{Log => Logger} + +trait Log { + val log = LogFactory.getLog(getClass.getName) +} + +/** + * A Logging trait you can mix into an implementation class without affecting its public API + */ +trait Logging { + + protected def log: Log + + protected def error(message: => String): Unit = log.log.error(message) + + protected def error(e: Throwable): Unit = log.log.error(e.getMessage, e) + + protected def error(message: => String, e: Throwable): Unit = log.log.error(message, e) + + protected def warn(message: => String): Unit = log.log.warn(message) + + protected def warn(message: => String, e: Throwable): Unit = log.log.warn(message, e) + + protected def info(message: => String): Unit = log.log.info(message) + + protected def info(message: => String, e: Throwable): Unit = log.log.info(message, e) + + protected def debug(message: => String): Unit = log.log.debug(message) + + protected def debug(message: => String, e: Throwable): Unit = log.log.debug(message, e) + + protected def trace(message: => String): Unit = log.log.trace(message) + + protected def trace(message: => String, e: Throwable): Unit = log.log.trace(message, e) + +} + Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,571 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.io.{File} +import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList, HashMap} +import _root_.org.apache.activemq.transport._ +import _root_.org.apache.activemq.Service +import _root_.java.lang.{String} +import _root_.org.apache.activemq.util.buffer.{Buffer, UTF8Buffer, AsciiBuffer} +import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper} +import _root_.org.fusesource.hawtdispatch.ScalaDispatch._ +import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained} + +import _root_.scala.collection.JavaConversions._ + +object BrokerFactory { + + val BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/"); + + trait Handler { + def createBroker(brokerURI:String):Broker + } + + + def createHandler(name:String):Handler = { + BROKER_FACTORY_HANDLER_FINDER.newInstance(name).asInstanceOf[Handler] + } + + /** + * Creates a broker from a URI configuration + * + * @param brokerURI the URI scheme to configure the broker + * @param startBroker whether or not the broker should have its + * {@link Broker#start()} method called after + * construction + * @throws Exception + */ + def createBroker(brokerURI:String, startBroker:Boolean=false):Broker = { + var split = brokerURI.split(":") + if (split.length < 2 ) { + throw new IllegalArgumentException("Invalid broker URI, no scheme specified: " + brokerURI) + } + var handler = createHandler(split(0)) + var broker = handler.createBroker(brokerURI) + if (startBroker) { + broker.start(); + } + return broker; + } + +} + +object BufferConversions { + + implicit def toAsciiBuffer(value:String) = new AsciiBuffer(value) + implicit def toUTF8Buffer(value:String) = new UTF8Buffer(value) + implicit def fromAsciiBuffer(value:AsciiBuffer) = value.toString + implicit def fromUTF8Buffer(value:UTF8Buffer) = value.toString + + implicit def toAsciiBuffer(value:Buffer) = value.ascii + implicit def toUTF8Buffer(value:Buffer) = value.utf8 +} + +import BufferConversions._ + +object BrokerConstants extends Log { + val CONFIGURATION = "CONFIGURATION" + val STOPPED = "STOPPED" + val STARTING = "STARTING" + val STOPPING = "STOPPING" + val RUNNING = "RUNNING" + val UNKNOWN = "UNKNOWN" + + val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default") +} + +class Broker() extends Service with Logging { + + import BrokerConstants._ + override protected def log = BrokerConstants + + class BrokerAcceptListener extends TransportAcceptListener { + def onAcceptError(error: Exception): Unit = { + warn("Accept error: " + error) + debug("Accept error details: ", error) + } + + def onAccept(transport: Transport): Unit = { + var connection = new BrokerConnection(Broker.this) + connection.transport = transport + clientConnections.add(connection) + try { + connection.start + } + catch { + case e1: Exception => { + onAcceptError(e1) + } + } + } + } + + val q = createQueue("broker"); + + var connectUris: List[String] = Nil + val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost] + val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer] + val clientConnections: ArrayList[Connection] = new ArrayList[Connection] + var dataDirectory: File = null + var state = CONFIGURATION + var name = "broker"; + var defaultVirtualHost: VirtualHost = null + + def removeConnectUri(uri: String): Unit = ^ { + this.connectUris = this.connectUris.filterNot(_==uri) + } ->: q + + def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) { + virtualHosts.get(name) + } ->: q + + def getConnectUris(cb: (List[String]) => Unit) = callback(cb) { + connectUris + } ->: q + + + def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) { + defaultVirtualHost + } ->: q + + def addVirtualHost(host: VirtualHost) = ^ { + if (host.names.isEmpty) { + throw new IllegalArgumentException("Virtual host must be configured with at least one host name.") + } + for (name <- host.names) { + if (virtualHosts.containsKey(name)) { + throw new IllegalArgumentException("Virtual host with host name " + name + " already exists.") + } + } + for (name <- host.names) { + virtualHosts.put(name, host) + } + if (defaultVirtualHost == null) { + setDefaultVirtualHost(host) + } + } ->: q + + def addTransportServer(server: TransportServer) = ^ { + state match { + case RUNNING => + start(server) + case CONFIGURATION => + this.transportServers.add(server) + case _ => + throw new IllegalStateException("Cannot add a transport server when broker is: " + state) + } + } ->: q + + def removeTransportServer(server: TransportServer) = ^ { + state match { + case RUNNING => + stopTransportServerWrapException(server) + case STOPPED => + this.transportServers.remove(server) + case CONFIGURATION => + this.transportServers.remove(server) + case _ => + throw new IllegalStateException("Cannot add a transport server when broker is: " + state) + } + } ->: q + + + def getState(cb: (String) => Unit) = callback(cb) {state} ->: q + + + def addConnectUri(uri: String) = ^ { + this.connectUris = this.connectUris ::: uri::Nil + } ->: q + + def removeVirtualHost(host: VirtualHost) = ^ { + for (name <- host.names) { + virtualHosts.remove(name) + } + if (host == defaultVirtualHost) { + if (virtualHosts.isEmpty) { + defaultVirtualHost = null + } + else { + defaultVirtualHost = virtualHosts.values.iterator.next + } + } + } ->: q + + def setDefaultVirtualHost(defaultVirtualHost: VirtualHost) = ^ { + this.defaultVirtualHost = defaultVirtualHost + } ->: q + + def getName(cb: (String) => Unit) = callback(cb) { + name; + } ->: q + + + private def start(server: TransportServer): Unit = { + server.setDispatchQueue(q) + server.setAcceptListener(new BrokerAcceptListener) + server.start + } + + + final def stop: Unit = ^ { + if (state == RUNNING) { + state = STOPPING + + for (server <- transportServers) { + stop(server) + } + for (connection <- clientConnections) { + stop(connection) + } + for (virtualHost <- virtualHosts.values) { + stop(virtualHost) + } + state = STOPPED; + } + + } ->: q + + def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) { + new ArrayList[VirtualHost](virtualHosts.values) + } ->: q + + def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) { + new ArrayList[TransportServer](transportServers) + } ->: q + + + + + def start = ^ { + if (state == CONFIGURATION) { + // We can apply defaults now + if (dataDirectory == null) { + dataDirectory = new File(IOHelper.getDefaultDataDirectory) + } + + if (defaultVirtualHost == null) { + defaultVirtualHost = new VirtualHost() + defaultVirtualHost.broker = Broker.this + defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil + virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost) + } + + state = STARTING + + for (virtualHost <- virtualHosts.values) { + virtualHost.start + } + for (server <- transportServers) { + start(server) + } + state = RUNNING + } else { + warn("Can only start a broker that is in the " + CONFIGURATION + " state. Broker was " + state) + } + } ->: q + + private def stopTransportServerWrapException(server: TransportServer): Unit = { + try { + server.stop + } + catch { + case e: Exception => { + throw new RuntimeException(e) + } + } + } + + + /** + * Helper method to help stop broker services and log error if they fail to start. + * @param server + */ + private def stop(server: Service): Unit = { + try { + server.stop + } catch { + case e: Exception => { + warn("Could not stop " + server + ": " + e) + debug("Could not stop " + server + " due to: ", e) + } + } + } +} + + +trait QueueLifecyleListener { + + /** + * A destination has bean created + * + * @param queue + */ + def onCreate(queue:Queue); + + /** + * A destination has bean destroyed + * + * @param queue + */ + def onDestroy(queue:Queue); + +} + + + + +object Queue { + val maxOutboundSize = 1024*1204*5 +} + +/** + * + * @author Hiram Chirino + */ +class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryTarget with DeliveryProducer { + + + + override val queue:DispatchQueue = createQueue("queue:"+destination); + queue.setTargetQueue(getRandomThreadQueue) + setDisposer(^{ + queue.release + }) + + + val delivery_buffer = new DeliveryBuffer + + class ConsumerState(val consumer:DeliveryTargetSession) { + var bound=true + + def deliver(value:MessageDelivery):Unit = { + val delivery = MessageDelivery(value) + delivery.setDisposer(^{ + ^{ completed(value) } ->:queue + }) + consumer.deliver(delivery); + delivery.release + } + + def completed(delivery:MessageDelivery) = { + // Lets get back on the readyList if we are still bound. + if( bound ) { + readyConsumers.addLast(this) + } + delivery_buffer.ack(delivery) + } + } + + var allConsumers = Map[DeliveryTarget,ConsumerState]() + val readyConsumers = new LinkedList[ConsumerState]() + + def connected(consumers:List[DeliveryTarget]) = bind(consumers) + def bind(consumers:List[DeliveryTarget]) = retaining(consumers) { + for ( consumer <- consumers ) { + val cs = new ConsumerState(consumer.open_session(queue)) + allConsumers += consumer->cs + readyConsumers.addLast(cs) + } + delivery_buffer.eventHandler.run + } ->: queue + + def unbind(consumers:List[DeliveryTarget]) = releasing(consumers) { + for ( consumer <- consumers ) { + allConsumers.get(consumer) match { + case Some(cs)=> + cs.bound = false + cs.consumer.close + allConsumers -= consumer + readyConsumers.remove(cs) + case None=> + } + } + } ->: queue + + def disconnected() = throw new RuntimeException("unsupported") + + def collocate(value:DispatchQueue):Unit = { + if( value.getTargetQueue ne queue.getTargetQueue ) { + println(queue.getLabel+" co-locating with: "+value.getLabel); + this.queue.setTargetQueue(value.getTargetQueue) + } + } + + + delivery_buffer.eventHandler = ^{ + while( !readyConsumers.isEmpty && !delivery_buffer.isEmpty ) { + val cs = readyConsumers.removeFirst + val delivery = delivery_buffer.receive + cs.deliver(delivery) + } + } + + + val deliveryQueue = new DeliveryCreditBufferProtocol(delivery_buffer, queue) + def open_session(producer_queue:DispatchQueue) = new DeliveryTargetSession { + val session = deliveryQueue.session(producer_queue) + val consumer = Queue.this + retain + + def deliver(delivery:MessageDelivery) = session.send(delivery) + def close = { + session.close + release + } + } + + def matches(message:MessageDelivery) = { true } + +// def open_session(producer_queue:DispatchQueue) = new ConsumerSession { +// val consumer = StompQueue.this +// val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer) +// retain +// +// def deliver(delivery:Delivery) = using(delivery) { +// deliveryQueue.send(delivery) +// } ->: queue +// +// def close = { +// release +// } +// } + + +} + +class XQueue(val destination:Destination) { + +// TODO: +// private VirtualHost virtualHost; +// +// Queue() { +// this.queue = queue; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq +// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) +// */ +// public void deliver(MessageDelivery message, ISourceController source) { +// queue.add(message, source); +// } +// +// public final void addSubscription(final Subscription sub) { +// queue.addSubscription(sub); +// } +// +// public boolean removeSubscription(final Subscription sub) { +// return queue.removeSubscription(sub); +// } +// +// public void start() throws Exception { +// queue.start(); +// } +// +// public void stop() throws Exception { +// if (queue != null) { +// queue.stop(); +// } +// } +// +// public void shutdown(Runnable onShutdown) throws Exception { +// if (queue != null) { +// queue.shutdown(onShutdown); +// } +// } +// +// public boolean hasSelector() { +// return false; +// } +// +// public boolean matches(MessageDelivery message) { +// return true; +// } +// +// public VirtualHost getBroker() { +// return virtualHost; +// } +// +// public void setVirtualHost(VirtualHost virtualHost) { +// this.virtualHost = virtualHost; +// } +// +// public void setDestination(Destination destination) { +// this.destination = destination; +// } +// +// public final Destination getDestination() { +// return destination; +// } +// +// public boolean isDurable() { +// return true; +// } +// +// public static class QueueSubscription implements BrokerSubscription { +// Subscription subscription; +// final Queue queue; +// +// public QueueSubscription(Queue queue) { +// this.queue = queue; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache. +// * activemq.broker.protocol.ProtocolHandler.ConsumerContext) +// */ +// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException { +// this.subscription = subscription; +// queue.addSubscription(subscription); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache +// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext) +// */ +// public void disconnect(ConsumerContext context) { +// queue.removeSubscription(subscription); +// } +// +// /* (non-Javadoc) +// * @see org.apache.activemq.broker.BrokerSubscription#getDestination() +// */ +// public Destination getDestination() { +// return queue.getDestination(); +// } +// } + + // TODO: + def matches(message:MessageDelivery) = false + def deliver(message:MessageDelivery) = { + // TODO: + } + + def getDestination() = destination + + def shutdown = {} +} \ No newline at end of file Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker + +import _root_.java.util.{LinkedHashMap, ArrayList, HashMap} +import _root_.org.apache.activemq.filter.{FilterException, BooleanExpression} +import path.{PathFilter} +import _root_.scala.collection.JavaConversions._ + +trait BrokerSubscription { + + def connect(consumer:ConsumerContext) + + def disconnect(consumer:ConsumerContext) + + def getDestination():Destination + +} + + +class CompositeSubscription(val destination:Destination, val subscriptions:List[BrokerSubscription] ) extends BrokerSubscription { + + + def connect(consumer:ConsumerContext) = { + for (sub <- subscriptions) { + sub.connect(consumer); + } + } + + def disconnect(consumer:ConsumerContext) = { + for (sub <- subscriptions) { + sub.disconnect(consumer); + } + } + + def getDestination() = destination + +} + +object WildcardQueueSubscription extends Log { + +} +class WildcardQueueSubscription(val host:VirtualHost, val destination:Destination, val consumer:ConsumerContext) extends BrokerSubscription with QueueLifecyleListener with Logging { + + protected def log = WildcardQueueSubscription + + var filter = PathFilter.parseFilter(destination.getName()); + val childSubs = new ArrayList[BrokerSubscription](); + + + /////////////////////////////////////////////////////////////////// + // BrokerSubscription interface implementation + /////////////////////////////////////////////////////////////////// + def connect(cc:ConsumerContext) = { + assert(cc == consumer) +// TODO: +// val domain = host.router.getDomain(Broker.QUEUE_DOMAIN); +// val matches = domain.route(destination.getName(), null); +// for (target <- matches) { +// val queue = target.asInstanceOf[Queue] +// var childSub = host.createSubscription(consumer, queue.destination); +// childSubs.add(childSub); +// childSub.connect(consumer); +// } + host.addDestinationLifecyleListener(this); + } + + def disconnect(cc:ConsumerContext) = { + assert(cc == consumer) + host.removeDestinationLifecyleListener(this); + for (childSub <- childSubs) { + childSub.disconnect(cc); + } + childSubs.clear(); + } + + def getDestination() : Destination = destination + + /////////////////////////////////////////////////////////////////// + // QueueLifecyleListener interface implementation + /////////////////////////////////////////////////////////////////// + def onCreate(queue:Queue) = { + if( filter.matches(queue.destination.getName()) ) { + try { + var childSub = host.createSubscription(consumer, queue.destination); + childSubs.add(childSub); + childSub.connect(consumer); + } catch { + case e:Exception=> + warn("Could not create dynamic subscription to "+queue.destination+": "+e); + debug("Could not create dynamic subscription to "+queue.destination+": ", e); + } + } + } + + def onDestroy(queue:Queue ) = { + } + +} + +class TopicSubscription { // extends BrokerSubscription with DeliveryTarget { + def matches(message:MessageDelivery) = false + def deliver(message:MessageDelivery) = {} + def connect(consumer:ConsumerContext) = {} + def disconnect(consumer:ConsumerContext) = {} + def getDestination():Destination = null + +// static final boolean USE_PERSISTENT_QUEUES = true; +// +// protected final BooleanExpression selector; +// protected final Destination destination; +// protected Subscription connectedSub; +// private final VirtualHost host; +// +// //TODO: replace this with a base interface for queue which also support non persistent use case. +// private IFlowQueue queue; +// +// TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) { +// this.host = host; +// this.selector = selector; +// this.destination = destination; +// } +// +// @Override +// public String toString() { +// return IntrospectionSupport.toString(this); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq +// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) +// */ +// public final void deliver(MessageDelivery message, ISourceController source) { +// if (matches(message)) { +// queue.add(message, source); +// } +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.broker.DeliveryTarget#hasSelector() +// */ +// public boolean hasSelector() { +// return selector != null; +// } +// +// public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException { +// if (this.connectedSub == null) { +// if( subscription.isPersistent() ) { +// queue = createPersistentQueue(subscription); +// } else { +// queue = createNonPersistentQueue(subscription); +// } +// queue.start(); +// +// this.connectedSub = subscription; +// this.queue.addSubscription(connectedSub); +// this.host.getRouter().bind(destination, this); +// } else if (connectedSub != subscription) { +// throw new UserAlreadyConnectedException(); +// } +// } +// +// private IFlowQueue createNonPersistentQueue(final ConsumerContext subscription) { +// Flow flow = new Flow(subscription.getResourceName(), false); +// String name = subscription.getResourceName(); +// IFlowLimiter limiter = new SizeLimiter(100, 50); +// ExclusiveQueue queue = new ExclusiveQueue(flow, name, limiter); +// queue.setDrain( new QueueDispatchTarget() { +// public void drain(MessageDelivery elem, ISourceController controller) { +// subscription.add(elem, controller); +// } +// }); +// return queue; +// } +// +// private IFlowQueue createPersistentQueue(ConsumerContext subscription) { +// ExclusivePersistentQueue queue = host.getQueueStore().createExclusivePersistentQueue(); +// return queue; +// } +// +// @SuppressWarnings("unchecked") +// private void destroyPersistentQueue(IFlowQueue queue) { +// ExclusivePersistentQueue pq = (ExclusivePersistentQueue) queue; +// host.getQueueStore().deleteQueue(pq.getDescriptor()); +// } +// +// public synchronized void disconnect(final ConsumerContext subscription) { +// if (connectedSub != null && connectedSub == subscription) { +// this.host.getRouter().unbind(destination, this); +// this.queue.removeSubscription(connectedSub); +// this.connectedSub = null; +// +// queue.stop(); +// if( USE_PERSISTENT_QUEUES ) { +// destroyPersistentQueue(queue); +// } +// queue=null; +// } +// } +// +// +// +// public boolean matches(MessageDelivery message) { +// if (selector == null) { +// return true; +// } +// +// MessageEvaluationContext selectorContext = message.createMessageEvaluationContext(); +// selectorContext.setDestination(destination); +// try { +// return (selector.matches(selectorContext)); +// } catch (FilterException e) { +// e.printStackTrace(); +// return false; +// } +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.broker.BrokerSubscription#getDestination() +// */ +// public Destination getDestination() { +// return destination; +// } + + +} + +class DurableSubscription(val host:VirtualHost, val destination:Destination, val selector:BooleanExpression) { // extends BrokerSubscription with DeliveryTarget { + +// private final IQueue queue; +// private Subscription connectedSub; + var started = false; +// TODO: +// this.host.router.bind(destination, this); + + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerSubscription#getDestination() + */ + def getDestination() = destination + + /* (non-Javadoc) + * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController) + */ + def deliver(message:MessageDelivery ) = { +// TODO: +// queue.add(message, source); + } + + def connect(subscription:ConsumerContext) = { +// TODO: +// if (this.connectedSub == null) { +// this.connectedSub = subscription; +// queue.addSubscription(connectedSub); +// } else if (connectedSub != subscription) { +// throw new UserAlreadyConnectedException(); +// } + } + + def disconnect(subscription:ConsumerContext) = { +// TODO: +// if (connectedSub != null && connectedSub == subscription) { +// queue.removeSubscription(connectedSub); +// connectedSub = null; +// } + } + + def matches(message:MessageDelivery) = { + if (selector != null) { + var selectorContext = message.message.messageEvaluationContext + selectorContext.setDestination(destination); + try { + (selector.matches(selectorContext)); + } catch { + case e:FilterException=> + e.printStackTrace(); + false; + } + } else { + true; + } + + } + +}