Author: chirino Date: Mon Jun 11 22:50:59 2012 New Revision: 1349060 URL: http://svn.apache.org/viewvc?rev=1349060&view=rev Log: Added a simple JVM level membership monitor so that test brokers can use dynamic ports even in the test configurations. Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index - copied, changed from r1348133, activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/MembershipMonitor.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/JVMMembershipMonitorDTO.java - copied, changed from r1348133, activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/MembershipMonitorDTO.java Removed: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index (from r1348133, activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index&p1=activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index&r1=1348133&r2=1349060&rev=1349060&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index Mon Jun 11 22:50:59 2012 @@ -14,5 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -NetworkManagerDTO -BridgeDTO \ No newline at end of file +org.apache.activemq.apollo.broker.network.JVMMembershipMonitorFactory \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index?rev=1349060&r1=1349059&r2=1349060&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index Mon Jun 11 22:50:59 2012 @@ -15,4 +15,6 @@ ## limitations under the License. ## --------------------------------------------------------------------------- NetworkManagerDTO -BridgeDTO \ No newline at end of file +BridgeDTO +MembershipMonitorDTO +JVMMembershipMonitorDTO \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/MembershipMonitor.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/MembershipMonitor.scala?rev=1349060&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/MembershipMonitor.scala (added) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/MembershipMonitor.scala Mon Jun 11 22:50:59 2012 @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker.network + +import dto.{MembershipMonitorDTO, NetworkManagerDTO, JVMMembershipMonitorDTO, ClusterMemberDTO} +import org.fusesource.hawtdispatch._ +import org.apache.activemq.apollo.dto.{CustomServiceDTO, VirtualHostDTO} +import java.util.concurrent.TimeUnit +import org.apache.activemq.apollo.broker.{BrokerRegistry, Broker} +import collection.mutable.HashSet +import org.apache.activemq.apollo.util._ + +trait MembershipMonitor extends Service { + var listener:MembershipListener = _ +} + +/** + *
+ *
+ * + * @author Hiram Chirino + */ +object MembershipMonitorFactory { + + trait SPI { + def create(broker:Broker, dto:MembershipMonitorDTO):MembershipMonitor + } + + val finder = new ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index",classOf[SPI]) + + def create(broker:Broker, dto:MembershipMonitorDTO):MembershipMonitor = { + if( dto == null ) { + return null + } + finder.singletons.foreach { provider=> + val rc = provider.create(broker, dto) + if( rc!=null ) { + return rc; + } + } + return null + } +} + + +trait MembershipListener { + def on_membership_change(members:collection.Set[ClusterMemberDTO]) +} + +case class StaticMembershipMonitor(members:collection.Set[ClusterMemberDTO]) extends BaseService with MembershipMonitor { + val dispatch_queue = createQueue("static cluster membership manager") + protected def _start(on_completed: Task) = { + dispatch_queue { + listener.on_membership_change(members) + } + on_completed.run() + } + protected def _stop(on_completed: Task) = { + on_completed.run() + } +} + +object JVMMembershipMonitorFactory extends MembershipMonitorFactory.SPI { + def create(broker: Broker, dto: MembershipMonitorDTO): MembershipMonitor = dto match { + case dto:JVMMembershipMonitorDTO=> new JVMMembershipMonitor + case _ => null + } +} + +class JVMMembershipMonitor extends BaseService with MembershipMonitor { + val dispatch_queue = createQueue("jvm cluster membership manager") + protected def _start(on_completed: Task) = { + schedule_reoccurring(1, TimeUnit.SECONDS) { + val brokers = HashSet[ClusterMemberDTO]() + for( broker <- BrokerRegistry.list() ) { + import collection.JavaConversions._ + broker.config.services.foreach(_ match { + case x:NetworkManagerDTO=> + if(x.self!=null ) { + if(NetworkManager.has_variables(x.self.id)) { + for( host <- broker.virtual_hosts.values ) { + var resolved = NetworkManager.resolve_variables(x.self, broker, host) + if( !NetworkManager.has_variables(resolved) ) { + brokers += resolved + } else { + println("not resolved") + } + } + } else { + var resolved = NetworkManager.resolve_variables(x.self, broker, null) + if( !NetworkManager.has_variables(resolved) ) { + brokers += resolved + } else { + println("not resolved") + } + } + } + case _ => + }) + } + listener.on_membership_change(brokers) + } + on_completed.run() + } + + protected def _stop(on_completed: Task) = on_completed.run() +} + +case class MulitMonitor(monitors:Seq[MembershipMonitor]) extends BaseService with MembershipMonitor { + + val dispatch_queue = createQueue("static cluster membership manager") + + case class MonitorData(monitor:MembershipMonitor) { + var last:collection.Set[ClusterMemberDTO] = HashSet[ClusterMemberDTO](); + } + + val monitor_data = monitors.map(MonitorData(_)) + + protected def _start(on_completed: Task) = { + val tracker = new LoggingTracker("membership monitor start") + for( data <- monitor_data ) { + data.monitor.listener = new MembershipListener { + def on_membership_change(members: collection.Set[ClusterMemberDTO]) = dispatch_queue { + if( data.last != members ) { + data.last = members + fire_changes + } + } + } + tracker.start(data.monitor) + } + tracker.callback(on_completed) + } + + def fire_changes = { + val members = HashSet[ClusterMemberDTO]() + monitor_data.foreach(x=> members ++= x.last ) + listener.on_membership_change(members) + } + + protected def _stop(on_completed: Task) = { + val tracker = new LoggingTracker("membership monitor stop") + for( data <- monitor_data ) { + tracker.stop(data.monitor) + } + tracker.callback(^{ + on_completed.run() + }) + } +} \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1349060&r1=1349059&r2=1349060&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Mon Jun 11 22:50:59 2012 @@ -22,8 +22,9 @@ import org.apache.activemq.apollo.broker import CollectionsSupport._ import java.util.concurrent.TimeUnit._ import collection.mutable.{LinkedHashMap, HashSet, ListBuffer, HashMap} -import org.apache.activemq.apollo.broker.{Broker, CustomServiceFactory} import org.apache.activemq.apollo.dto.{LoadStatusDTO, CustomServiceDTO} +import org.apache.activemq.apollo.broker.{AcceptingConnector, VirtualHost, Broker, CustomServiceFactory} +import java.net.InetSocketAddress /** *
@@ -41,16 +42,60 @@ object NetworkManagerFactory extends Cus
}
}
-object NetworkManager extends Log
+object NetworkManager extends Log {
+
+ def has_variables(x:String) = x.contains("{{"):Boolean
-class NetworkManager(broker: Broker) extends BaseService with ClusterMembershipListener with BrokerLoadListener {
+ def has_variables(dto:ClusterMemberDTO):Boolean = {
+ import collection.JavaConversions._
+ has_variables(dto.id) || dto.services.foldLeft(false){ case (x,y) =>
+ x || has_variables(y.address)
+ }
+ }
+
+ def resolve_variables(dto:ClusterMemberDTO, broker:Broker, host:VirtualHost):ClusterMemberDTO = {
+ import collection.JavaConversions._
+ def resolve(x:String) = if( !x.contains("{{") ) { x } else {
+ var rc = x;
+ if( host!=null ) {
+ rc = rc.replaceAllLiterally("{{host}}", host.id)
+ }
+ if( broker.web_server!=null && broker.web_server.uris()!=null && !broker.web_server.uris().isEmpty) {
+ rc = rc.replaceAllLiterally("{{web_admin.url}}", broker.web_server.uris()(0).toString.stripSuffix("/"))
+ }
+ for( (id, connector) <- broker.connectors ) {
+ connector match {
+ case connector:AcceptingConnector =>
+ connector.socket_address match {
+ case address:InetSocketAddress =>
+ rc = rc.replaceAllLiterally("{{connector."+id+".port}}", ""+address.getPort)
+ }
+ case _ =>
+ }
+ }
+ rc
+ }
+
+ val rc = new ClusterMemberDTO
+ rc.id = resolve(dto.id)
+ for( service <- dto.services) {
+ val s = new ClusterServiceDTO
+ s.kind = service.kind
+ s.address = resolve(service.address)
+ rc.services.add(s)
+ }
+ rc
+ }
+}
+
+class NetworkManager(broker: Broker) extends BaseService with MembershipListener with BrokerLoadListener {
import NetworkManager._
val dispatch_queue = createQueue("bridge manager")
var config = new NetworkManagerDTO
- var membership_monitor:ClusterMembershipMonitor = _
- var members = Set[ClusterMemberDTO]()
+ var membership_monitor:MembershipMonitor = _
+ var members = collection.Set[ClusterMemberDTO]()
var members_by_id = HashMap[String, ClusterMemberDTO]()
var load_monitor: BrokerLoadMonitor = _
var metrics_map = HashMap[String, BrokerMetrics]()
@@ -64,8 +109,22 @@ class NetworkManager(broker: Broker) ext
import collection.JavaConversions._
// TODO: also support dynamic membership discovery..
- membership_monitor = StaticClusterMembershipMonitor(config.members.toSet)
+ var monitors = List[MembershipMonitor]()
+ var static_set = config.members.toSet
+ if( !has_variables(config.self) ) {
+ static_set += config.self
+ }
+
+ monitors ::= StaticMembershipMonitor(static_set)
+
+ for( monitor_dto <- config.membership_monitors ) {
+ var monitor = MembershipMonitorFactory.create(broker, monitor_dto)
+ if(monitor!=null) {
+ monitors ::= monitor
+ }
+ }
+ membership_monitor = MulitMonitor(monitors)
membership_monitor.listener = this
membership_monitor.start(NOOP)
@@ -84,7 +143,7 @@ class NetworkManager(broker: Broker) ext
on_completed.run()
}
- def on_cluster_change(value: Set[ClusterMemberDTO]) = dispatch_queue {
+ def on_membership_change(value: collection.Set[ClusterMemberDTO]) = dispatch_queue {
val (added, _, removed) = diff(members, value)
for( m <- removed ) {
load_monitor.remove(m)
@@ -157,11 +216,21 @@ class NetworkManager(broker: Broker) ext
}
- def local_broker_id = config.self
+ def is_local_broker_id(id:String):Boolean = {
+ if( has_variables(config.self.id) ) {
+ for( host <- broker.virtual_hosts.values ) {
+ if( config.self.id.replaceAllLiterally("{{host}}", host.id) == id )
+ return true
+ }
+ false
+ } else {
+ config.self.id == id
+ }
+ }
- def can_bridge_from(broker:String):Boolean = broker==local_broker_id
+ def can_bridge_from(broker:String):Boolean = is_local_broker_id(broker)
def can_bridge_to(broker:String):Boolean = {
- if ( broker == local_broker_id) {
+ if ( is_local_broker_id(broker) ) {
OptionSupport(config.duplex).getOrElse(false)
} else {
true
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?rev=1349060&r1=1349059&r2=1349060&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java Mon Jun 11 22:50:59 2012
@@ -34,4 +34,24 @@ public class ClusterMemberDTO {
@XmlElement(name="service")
public ArrayList