activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1127907 - in /activemq/activemq-apollo/trunk: apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala pom.xml
Date Thu, 26 May 2011 13:43:57 GMT
Author: chirino
Date: Thu May 26 13:43:57 2011
New Revision: 1127907

URL: http://svn.apache.org/viewvc?rev=1127907&view=rev
Log:
Simplify the web jaxrs resource implementation a bit by making heavier use of Futures.

Modified:
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1127907&r1=1127906&r2=1127907&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Thu May 26 13:43:57 2011
@@ -23,9 +23,9 @@ import org.apache.activemq.apollo.dto._
 import java.{lang => jl}
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker._
-import scala.util.continuations._
 import scala.collection.Iterable
-import javax.xml.soap.SOAPMessage
+import org.apache.activemq.apollo.util.{Failure, Success, Dispatched, Result}
+import scala.Some
 
 /**
  * <p>
@@ -39,16 +39,98 @@ import javax.xml.soap.SOAPMessage
 @Produces(Array("application/json", "application/xml","text/xml", "text/html;qs=5"))
 case class BrokerResource() extends Resource() {
 
+  type FutureResult[T] = Future[Result[T, Throwable]]
+
+  def FutureResult[T]() = Future[Result[T, Throwable]]()
+
+  private def sync[T](dispached:Dispatched)(func: =>FutureResult[T]):FutureResult[T] =
{
+    val rc = Future[Result[T, Throwable]]()
+    dispached.dispatch_queue.apply {
+      try {
+        func.onComplete(x=> rc.apply(x))
+      } catch {
+        case e:Throwable => rc.apply(Failure(e))
+      }
+    }
+    rc
+  }
+
+
+  def sync_all[T,D<:Dispatched](values:Iterable[D])(func: (D)=>FutureResult[T]) = {
+    Future.all {
+      values.map { value=>
+        sync(value) {
+          func(value)
+        }
+      }
+    }
+  }
+
+  private implicit def to_local_router(host:VirtualHost):LocalRouter = {
+    host.router.asInstanceOf[LocalRouter]
+  }
+
+  private implicit def wrap_future_result[T](value:T):FutureResult[T] = {
+    val rc = FutureResult[T]()
+    rc.apply(Success(value))
+    rc
+  }
+
+  private implicit def unwrap_future_result[T](value:FutureResult[T]):T = {
+    value.await() match {
+      case Success(value) => value
+      case Failure(value) => throw value
+    }
+  }
+
+  private def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker)=>FutureResult[T]):FutureResult[T]
= {
+    BrokerRegistry.list.headOption match {
+      case Some(broker)=>
+        sync(broker) {
+          func(broker)
+        }
+      case None=>
+        result(NOT_FOUND)
+    }
+  }
+
+  private def with_virtual_host[T](id:String)(func: (VirtualHost)=>FutureResult[T]):FutureResult[T]
= {
+    with_broker { broker =>
+      broker.virtual_hosts.valuesIterator.find( _.id == id) match {
+        case Some(virtualHost)=>
+          sync(virtualHost) {
+            func(virtualHost)
+          }
+        case None=>
+          result(NOT_FOUND)
+      }
+    }
+  }
+
+  private def with_connection[T](id:Long)(func: BrokerConnection=>FutureResult[T]):FutureResult[T]
= {
+    with_broker { broker =>
+      broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
+        case Some(connection:BrokerConnection) =>
+          sync(connection) {
+            func(connection)
+          }
+        case None=>
+          result(NOT_FOUND)
+      }
+    }
+  }
+
+
   @Path("config")
   def config_resource:ConfigurationResource = {
-    with_broker { case (broker, cb) =>
-      cb(Some(ConfigurationResource(this, broker.config)))
+    with_broker { broker =>
+      ConfigurationResource(this, broker.config)
     }
   }
 
   @GET
   def get_broker():BrokerStatusDTO = {
-    with_broker { case (broker, cb) =>
+    with_broker { broker =>
       val result = new BrokerStatusDTO
 
       result.id = broker.id
@@ -71,17 +153,15 @@ case class BrokerResource() extends Reso
           result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress
) )
         }
       }
-      cb(Some(result))
+      result
     }
   }
 
   @GET
   @Path("queue-metrics")
   def get_queue_metrics(): AggregateQueueMetricsDTO = {
-    with_broker { case (broker, cb) =>
-      get_queue_metrics(broker).onComplete{ metrics=>
-        cb(Some(metrics))
-      }
+    with_broker { broker =>
+      get_queue_metrics(broker)
     }
   }
 
@@ -131,64 +211,64 @@ case class BrokerResource() extends Reso
     }
   }
 
-  def get_queue_metrics(broker:Broker):Future[AggregateQueueMetricsDTO] = {
-    val metrics = Future.all {
-      broker.virtual_hosts.values.map { host=>
-        host.dispatch_queue.flatFuture{ get_queue_metrics(host) }
-      }
+  def get_queue_metrics(broker:Broker):FutureResult[AggregateQueueMetricsDTO] = {
+    val metrics = sync_all(broker.virtual_hosts.values) { host =>
+      get_queue_metrics(host)
     }
-    metrics.map( x=> aggregate_queue_metrics(x) )
+    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option)) ))
   }
 
-  def get_queue_metrics(virtualHost:VirtualHost):Future[AggregateQueueMetricsDTO] = {
-    val metrics = Future.all{
-      virtualHost.router.asInstanceOf[LocalRouter].queues_by_id.values.map { queue=>
-        queue.dispatch_queue.future { get_queue_metrics(queue) }
-      }
+  def get_queue_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
+    val router:LocalRouter = host
+    val queues: Iterable[Queue] = router.queues_by_id.values
+    val metrics = sync_all(queues) { queue =>
+      get_queue_metrics(queue)
     }
-    metrics.map( x=> aggregate_queue_metrics(x) )
+    metrics.map( x=> Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
   }
 
 
   @GET @Path("virtual-hosts/{id}")
   def virtual_host(@PathParam("id") id : String):VirtualHostStatusDTO = {
-    with_virtual_host(id) { case (virtual_host,cb) =>
+    with_virtual_host(id) { host =>
       val result = new VirtualHostStatusDTO
-      result.id = virtual_host.id
-      result.state = virtual_host.service_state.toString
-      result.state_since = virtual_host.service_state.since
-      result.store = virtual_host.store!=null
+      result.id = host.id
+      result.state = host.service_state.toString
+      result.state_since = host.service_state.since
+      result.store = host.store!=null
 
-      virtual_host.router.asInstanceOf[LocalRouter].topic_domain.destinations.foreach { node=>
+      val router:LocalRouter = host
+
+      router.topic_domain.destinations.foreach { node=>
         result.topics.add(new LongIdLabeledDTO(node.id, node.name))
       }
 
-      virtual_host.router.asInstanceOf[LocalRouter].queue_domain.destinations.foreach { node=>
+      router.queue_domain.destinations.foreach { node=>
         result.queues.add(new LongIdLabeledDTO(node.id, node.binding.label))
       }
 
-      cb(Some(result))
+      result
     }
   }
 
   @GET @Path("virtual-hosts/{id}/queue-metrics")
   def virtual_host_queue_metrics(@PathParam("id") id : String): AggregateQueueMetricsDTO
= {
-    with_virtual_host(id) { case (virtualHost,cb) =>
-      get_queue_metrics(virtualHost).onComplete { metrics=>
-        cb(Some(metrics))
-      }
+    with_virtual_host(id) { virtualHost =>
+      get_queue_metrics(virtualHost)
     }
   }
 
   @GET @Path("virtual-hosts/{id}/store")
   def store(@PathParam("id") id : String):StoreStatusDTO = {
-    with_virtual_host(id) { case (virtualHost,cb) =>
+    with_virtual_host(id) { virtualHost =>
       if(virtualHost.store!=null) {
+        val rc = FutureResult[StoreStatusDTO]()
         virtualHost.store.get_store_status { status =>
-          cb(Some(status))
+          rc(Success(status))
         }
+        rc
       } else {
-        cb(None)
+        result(NOT_FOUND)
       }
     }
   }
@@ -211,50 +291,51 @@ case class BrokerResource() extends Reso
 
   @GET @Path("virtual-hosts/{id}/topics/{dest}")
   def destination(@PathParam("id") id : String, @PathParam("dest") dest : Long):TopicStatusDTO
= {
-    with_virtual_host(id) { case (virtualHost,cb) =>
-      cb(virtualHost.router.asInstanceOf[LocalRouter].topic_domain.destination_by_id.get(dest)
map { node=>
-        val result = new TopicStatusDTO
-        result.id = node.id
-        result.name = node.name
-        result.config = node.config
+    with_virtual_host(id) { host =>
 
-        node.durable_subscriptions.foreach { q=>
-          result.durable_subscriptions.add(new LongIdLabeledDTO(q.id, q.binding.label))
-        }
-        node.consumers.foreach { consumer=>
-          consumer match {
-            case queue:Queue =>
-              result.consumers.add(link(queue))
-            case _ =>
-              consumer.connection.foreach{c=>
-                result.consumers.add(link(c))
-              }
-          }
-        }
-        node.producers.flatMap( _.connection ).foreach { connection=>
-          result.producers.add(link(connection))
+      val router:LocalRouter = host
+      val node = router.topic_domain.destination_by_id.get(dest).getOrElse(result(NOT_FOUND))
+      val rc = new TopicStatusDTO
+      rc.id = node.id
+      rc.name = node.name
+      rc.config = node.config
+
+      node.durable_subscriptions.foreach { q=>
+        rc.durable_subscriptions.add(new LongIdLabeledDTO(q.id, q.binding.label))
+      }
+      node.consumers.foreach { consumer=>
+        consumer match {
+          case queue:Queue =>
+            rc.consumers.add(link(queue))
+          case _ =>
+            consumer.connection.foreach{c=>
+              rc.consumers.add(link(c))
+            }
         }
+      }
+      node.producers.flatMap( _.connection ).foreach { connection=>
+        rc.producers.add(link(connection))
+      }
 
-        result
-      })
+      rc
     }
   }
 
   @GET @Path("virtual-hosts/{id}/all-queues/{queue}")
   def queue(@PathParam("id") id : String, @PathParam("queue") qid : Long, @QueryParam("entries")
entries:Boolean):QueueStatusDTO = {
-    with_virtual_host(id) { case (virtualHost,cb) =>
-      reset {
-        val queue = virtualHost.router.get_queue(qid)
-        status(queue, entries, cb)
-      }
+    with_virtual_host(id) { host =>
+      val router:LocalRouter = host
+      val queue = router.queues_by_id.get(qid)
+      status(queue, entries)
     }
   }
 
   @GET @Path("virtual-hosts/{id}/queues/{queue}")
   def destination_queue(@PathParam("id") id : String, @PathParam("queue") qid : Long, @QueryParam("entries")
entries:Boolean ):QueueStatusDTO = {
-    with_virtual_host(id) { case (virtualHost,cb) =>
-      val queue = virtualHost.router.asInstanceOf[LocalRouter].queue_domain.destination_by_id.get(qid)
-      status(queue, entries, cb)
+    with_virtual_host(id) { host =>
+      val router:LocalRouter = host
+      val queue = router.queue_domain.destination_by_id.get(qid)
+      status(queue, entries)
     }
   }
 
@@ -292,11 +373,10 @@ case class BrokerResource() extends Reso
     rc
   }
 
-  def status(qo:Option[Queue], entries:Boolean=false, cb:Option[QueueStatusDTO]=>Unit):Unit
= if(qo==None) {
-    cb(None)
-  } else {
-    val q = qo.get
-    q.dispatch_queue {
+  def status(qo:Option[Queue], entries:Boolean=false) = qo match {
+    case None=> result(NOT_FOUND)
+    case Some(q)=> sync(q) {
+
       val rc = new QueueStatusDTO
       rc.id = q.id
       rc.destination = q.binding.binding_dto
@@ -351,7 +431,7 @@ case class BrokerResource() extends Reso
         }
         rc.consumers.add(status)
       }
-      cb(Some(rc))
+      rc
     }
   }
 
@@ -365,9 +445,9 @@ case class BrokerResource() extends Reso
 
   @GET @Path("connectors/{id}")
   def connector(@PathParam("id") id : String):ConnectorStatusDTO = {
-    with_broker { case (broker, cb) =>
+    with_broker { broker =>
       broker.connectors.find(_.id == id) match {
-        case None=> cb(None)
+        case None=> result(NOT_FOUND)
         case Some(connector)=>
 
           val result = new ConnectorStatusDTO
@@ -381,7 +461,7 @@ case class BrokerResource() extends Reso
             result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress
) )
           }
 
-          cb(Some(result))
+          result
       }
     }
   }
@@ -389,7 +469,7 @@ case class BrokerResource() extends Reso
 
   @GET @Path("connections")
   def connections:LongIdListDTO = {
-    with_broker { case (broker, cb) =>
+    with_broker { broker =>
       val rc = new LongIdListDTO
 
       broker.connectors.foreach { connector=>
@@ -399,19 +479,7 @@ case class BrokerResource() extends Reso
         }
       }
       
-      cb(Some(rc))
-    }
-  }
-
-  def with_connection[T](id:Long)(func: BrokerConnection=>T):T = {
-    with_broker { case (broker, cb) =>
-      broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
-        case None => cb(None)
-        case Some(connection:BrokerConnection) =>
-          connection.dispatch_queue {
-            cb(Some(func(connection)))
-          }
-      }
+      rc
     }
   }
 
@@ -445,39 +513,10 @@ case class BrokerResource() extends Reso
 
     // do the the exit async so that we don't
     // kill the current request.
-    Broker.BLOCKABLE_THREAD_POOL {
-      Thread.sleep(200);
+    Broker.BLOCKABLE_THREAD_POOL.apply {
+      Thread.sleep(200)
       System.exit(0)
     }
   }
 
-  def concurrent_map[T,R](values:Iterable[T])(dqf:(T)=>DispatchQueue)(func:T=>R) =
{
-    Future.all( values.map { t=>
-      dqf(t).future { func(t) }
-    })
-  }
-
-  private def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker, Option[T]=>Unit)=>Unit):T
= {
-    BrokerRegistry.list.headOption match {
-      case None=> result(NOT_FOUND)
-      case Some(broker)=>
-        val f = Future[Option[T]]()
-        broker.dispatch_queue {
-          func(broker, f)
-        }
-        f().getOrElse(result(NOT_FOUND))
-    }
-  }
-
-  private def with_virtual_host[T](id:String)(func: (VirtualHost, Option[T]=>Unit)=>Unit):T
= {
-    with_broker { case (broker, cb) =>
-      broker.virtual_hosts.valuesIterator.find( _.id == id) match {
-        case Some(virtualHost)=>
-          virtualHost.dispatch_queue {
-            func(virtualHost, cb)
-          }
-        case None=> cb(None)
-      }
-    }
-  }
 }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1127907&r1=1127906&r2=1127907&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Thu May 26 13:43:57 2011
@@ -49,7 +49,7 @@ abstract class Resource(private val pare
     this.uri_info = parent.uri_info
   }
 
-  def result[T](value:Status, message:Any=null):T = {
+  def result(value:Status, message:Any=null):Nothing = {
     val response = Response.status(value)
     if( message!=null ) {
       response.entity(message)

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1127907&r1=1127906&r2=1127907&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu May 26 13:43:57 2011
@@ -96,7 +96,7 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.2</hawtdispatch-version>
+    <hawtdispatch-version>1.3-SNAPSHOT</hawtdispatch-version>
     <hawtbuf-version>1.4</hawtbuf-version>
     
     <jdbm-version>2.0.1</jdbm-version>



Mime
View raw message