activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961192 - in /activemq/sandbox/activemq-apollo-actor: activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/ activemq-transport/src/main/java/org/apache/activemq/transport/ a...
Date Wed, 07 Jul 2010 04:16:32 GMT
Author: chirino
Date: Wed Jul  7 04:16:32 2010
New Revision: 961192

URL: http://svn.apache.org/viewvc?rev=961192&view=rev
Log:
web module improvements
- now exposes connection status
- better jaxrs resource orginization

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
      - copied, changed from r961191, activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
      - copied, changed from r961191, activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/StatusResource.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/StatusResource.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
Wed Jul  7 04:16:32 2010
@@ -34,11 +34,17 @@ import javax.xml.bind.annotation.XmlRoot
 public class BrokerSummaryDTO extends StringIdDTO {
 
     /**
-     * The latest revision of the broker config.
+     * Is a running broker accessible via management API calls?
      */
     @JsonProperty
 	@XmlAttribute
-	public Integer rev;
+	public boolean manageable;
 
+    /**
+     * Is the broker configuration accessible via API calls? 
+     */
+    @JsonProperty
+	@XmlAttribute
+	public boolean configurable;
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
Wed Jul  7 04:16:32 2010
@@ -52,6 +52,13 @@ public class ConnectionStatusDTO extends
      */
     @JsonProperty
 	@XmlAttribute
+	public String transport;
+
+    /**
+     * The protocol the connection is using.
+     */
+    @JsonProperty
+	@XmlAttribute
 	public String protocol;
 
     /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Wed Jul  7 04:16:32 2010
@@ -47,6 +47,8 @@ public class TcpTransport extends BaseSe
     private static final Log LOG = LogFactory.getLog(TcpTransport.class);
 
     private Map<String, Object> socketOptions;
+    private long writeCounter;
+    private long readCounter;
 
     abstract static class SocketState {
         void onStop(Runnable onCompleted) {
@@ -407,8 +409,11 @@ public class TcpTransport extends BaseSe
             while (socketState.is(CONNECTED.class) ) {
 
                 // if we have a pending write that is being sent over the socket...
-                if (outbound_buffer.remaining()!=0) {
+                int remaining = outbound_buffer.remaining();
+                if (remaining!=0) {
                     channel.write(outbound_buffer);
+                    int count = remaining - outbound_buffer.remaining();
+                    writeCounter += count;
                     if (outbound_buffer.remaining() != 0) {
                         return false;
                     }
@@ -477,6 +482,7 @@ public class TcpTransport extends BaseSe
                 } else if (count == 0) {
                     return;
                 }
+                readCounter += count;
             }
 
             Object command = this.wireformat.unmarshalNB(readBuffer);
@@ -502,6 +508,21 @@ public class TcpTransport extends BaseSe
         return remoteAddress;
     }
 
+    /**
+     * @return The number of bytes sent by the transport.
+     */
+    public long getWriteCounter() {
+        return writeCounter;
+    }
+
+    /**
+     * @return The number of bytes received by the transport.
+     */
+    public long getReadCounter() {
+        return readCounter;
+    }
+
+
     public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
             return target.cast(this);
@@ -534,6 +555,10 @@ public class TcpTransport extends BaseSe
         }
     }
 
+    public String getTypeId() {
+        return "tcp";
+    }
+
     public void reconnect(URI uri) {
         throw new UnsupportedOperationException();
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
Wed Jul  7 04:16:32 2010
@@ -92,6 +92,16 @@ public interface Transport extends Servi
     String getRemoteAddress();
 
     /**
+     * @return The number of bytes sent by the transport.
+     */
+    long getWriteCounter();
+
+    /**
+     * @return The number of bytes received by the transport.
+     */
+    long getReadCounter();
+
+    /**
      * Indicates if the transport can handle faults
      * 
      * @return true if fault tolerant
@@ -122,4 +132,8 @@ public interface Transport extends Servi
      */
     void reconnect(URI uri);
 
+    /**
+     * @return the identifier for the transport type.  Example "tcp" for the tcp transport.

+     */
+    String getTypeId();
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
Wed Jul  7 04:16:32 2010
@@ -149,6 +149,14 @@ public class TransportFilter implements 
         return next.getRemoteAddress();
     }
 
+    public long getReadCounter() {
+        return next.getReadCounter();
+    }
+
+    public long getWriteCounter() {
+        return next.getWriteCounter();
+    }
+
     /**
      * @return
      * @see org.apache.activemq.transport.Transport#isFaultTolerant()
@@ -165,6 +173,10 @@ public class TransportFilter implements 
         return next.isConnected();
     }
 
+    public String getTypeId() {
+        return next.getTypeId();
+    }
+
     public void reconnect(URI uri) {
         next.reconnect(uri);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
Wed Jul  7 04:16:32 2010
@@ -49,6 +49,9 @@ public class PipeTransport implements Tr
     private CustomDispatchSource<Object,LinkedList<Object>> dispatchSource;
     private boolean connected;
 
+    private long writeCounter = 0;
+    private long readCounter = 0;
+
     public PipeTransport(PipeTransportServer server) {
         this.server = server;
     }
@@ -89,6 +92,7 @@ public class PipeTransport implements Tr
                                 if (wireformat != null && marshal) {
                                     listener.onTransportCommand(wireformat.unmarshal((Buffer)
o));
                                 } else {
+                                    readCounter ++;
                                     listener.onTransportCommand(o);
                                 }
                             }
@@ -180,10 +184,25 @@ public class PipeTransport implements Tr
     }
 
     private void transmit(Object command) {
+        writeCounter++;
         outbound++;
         peer.dispatchSource.merge(command);
     }
 
+    /**
+     * @return The number of bytes sent by the transport.
+     */
+    public long getWriteCounter() {
+        return writeCounter;
+    }
+
+    /**
+     * @return The number of bytes received by the transport.
+     */
+    public long getReadCounter() {
+        return readCounter;
+    }
+
     public String getRemoteAddress() {
         return remoteAddress;
     }
@@ -206,6 +225,10 @@ public class PipeTransport implements Tr
         throw new UnsupportedOperationException();
     }
 
+    public String getTypeId() {
+        return "pipe";
+    }
+
     public void setRemoteAddress(String remoteAddress) {
         this.remoteAddress = remoteAddress;
         if (name == null) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
Wed Jul  7 04:16:32 2010
@@ -99,14 +99,11 @@ class ServletContextListener extends Gui
 
 
   def createConfigStore():ConfigStore = {
-    println("created store")
     val store = new FileConfigStore
     store.file = new File("activemq.xml")
     LoggingTracker("config store startup") { tracker=>
-      println("starting store")
       store.start(tracker.task())
     }
-    println("store started")
     store
   }
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
(from r961191, activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala&r1=961191&r2=961192&rev=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
Wed Jul  7 04:16:32 2010
@@ -123,6 +123,6 @@ case class Broker(parent:Root, @BeanProp
   }
 
   @Path("status")
-  def status = StatusResource(this)
+  def status = RuntimeResource(this)
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala?rev=961192&r1=961191&r2=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RootResource.scala
Wed Jul  7 04:16:32 2010
@@ -29,6 +29,7 @@ import Response.Status._
 import org.apache.activemq.apollo.dto.{IdListDTO, BrokerSummaryDTO, BrokerDTO}
 import java.util.{Arrays, Collections}
 import org.apache.activemq.apollo.web.ConfigStore
+import org.apache.activemq.apollo.broker.BrokerRegistry
 
 /**
  * Defines the default representations to be used on resources
@@ -48,81 +49,46 @@ trait Resource extends Logging {
 }
 
 /**
- * Manages a collection of broeker resources.
+ * Manages a collection of broker resources.
  */
-@Path("/")
+@Path("/brokers")
 class Root() extends Resource {
 
-  @Context
-  var configStore = ConfigStore()
-
   @GET
-  def get() = {
+  def brokers = {
     val rc = new IdListDTO
     val ids = Future[List[String]] { cb=>
-      configStore.listBrokers(cb)
+      ConfigStore().listBrokers(cb)
     }.toArray[String]
     rc.ids.addAll(Arrays.asList(ids: _*))
     rc
   }
 
   @Path("{id}")
-  def broker(@PathParam("id") id : String): Broker = new Broker(this, id)
+  def brokers(@PathParam("id") id : String): Broker = new Broker(this, id)
 }
 
 /**
- * A broker resource is used to represent the configuration and runtime status of a broker.
+ * Resource that identifies a managed broker.
  */
 case class Broker(parent:Root, @BeanProperty id: String) extends Resource {
 
-  @Context
-  var configStore = ConfigStore()
-
   @GET
-  def get() = {
-    val c = config()
+  def get = {
     val rc = new BrokerSummaryDTO
     rc.id = id
-    rc.rev = c.rev
+    rc.manageable = BrokerRegistry.get(id)!=null
+    rc.configurable = Future[Option[BrokerDTO]] { cb=>
+        ConfigStore().getBroker(id, false)(cb)
+      }.isDefined
     rc
   }
 
-  @GET @Path("config")
-  def getConfig():BrokerDTO = {
-    config()
-  }
-
-  private def config() = {
-    Future[Option[BrokerDTO]] { cb=>
-      configStore.getBroker(id, false)(cb)
-    }.getOrElse(result(NOT_FOUND))
-  }
-
-  @GET @Path("config/{rev}")
-  def getConfig(@PathParam("rev") rev:Int):BrokerDTO = {
-    // that rev may have gone away..
-    var c = config()
-    c.rev==rev || result(NOT_FOUND)
-    c
-  }
+  @Path("config")
+  def config = ConfigurationResource(this)
 
-  @PUT @Path("config/{rev}")
-  def put(@PathParam("rev") rev:Int, config:BrokerDTO) = {
-    config.id = id;
-    config.rev = rev
-    Future[Boolean] { cb=>
-      configStore.putBroker(config)(cb)
-    } || result(NOT_FOUND)
-  }
-
-  @DELETE @Path("config/{rev}")
-  def delete(@PathParam("rev") rev:Int) = {
-    Future[Boolean] { cb=>
-      configStore.removeBroker(id, rev)(cb)
-    } || result(NOT_FOUND)
-  }
+  @Path("runtime")
+  def runtime = RuntimeResource(this)
 
-  @Path("status")
-  def status = StatusResource(this)
 }
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
(from r961191, activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/StatusResource.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/StatusResource.scala&r1=961191&r2=961192&rev=961192&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/StatusResource.scala
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
Wed Jul  7 04:16:32 2010
@@ -26,6 +26,7 @@ import collection.JavaConversions
 import org.fusesource.hawtdispatch.{ScalaDispatch, Future}
 import ScalaDispatch._
 import org.apache.activemq.apollo.broker._
+import collection.mutable.ListBuffer
 
 /**
  * <p>
@@ -34,36 +35,55 @@ import org.apache.activemq.apollo.broker
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class StatusResource(parent:Broker) extends Resource {
+case class RuntimeResource(parent:Broker) extends Resource {
 
-  val broker:org.apache.activemq.apollo.broker.Broker = BrokerRegistry.get(parent.id)
-  if( broker == null ) {
-    result(NOT_FOUND)
+  private def with_broker[T](func: (org.apache.activemq.apollo.broker.Broker, Option[T]=>Unit)=>Unit):T
= {
+    val broker:org.apache.activemq.apollo.broker.Broker = BrokerRegistry.get(parent.id)
+    if( broker == null ) {
+      result(NOT_FOUND)
+    } else {
+      Future[Option[T]] { cb=>
+        broker.dispatchQueue {
+          func(broker, cb)
+        }
+      }.getOrElse(result(NOT_FOUND))
+    }
+  }
+
+  private def with_virtual_host[T](id:Long)(func: (VirtualHost, Option[T]=>Unit)=>Unit):T
= {
+    with_broker { case (broker, cb) =>
+      broker.virtualHosts.valuesIterator.find( _.id == id) match {
+        case Some(virtualHost)=>
+          virtualHost.dispatchQueue {
+            func(virtualHost, cb)
+          }
+        case None=> cb(None)
+      }
+    }
   }
 
+
   @GET
   def get() = {
-    Future[BrokerStatusDTO] { cb=>
-      broker.dispatchQueue {
-        val result = new BrokerStatusDTO
-
-        result.id = broker.id
-        result.currentTime = System.currentTimeMillis
-        result.state = broker.serviceState.toString
-        result.stateSince - broker.serviceState.since
-        result.config = broker.config
-
-        broker.connectors.foreach{ c=>
-          result.connectors.add(c.id)
-        }
+    with_broker[BrokerStatusDTO] { case (broker, cb) =>
+      val result = new BrokerStatusDTO
 
-        broker.virtualHosts.values.foreach{ host=>
-          result.virtualHosts.add( host.id )
-        }
+      result.id = broker.id
+      result.currentTime = System.currentTimeMillis
+      result.state = broker.serviceState.toString
+      result.stateSince - broker.serviceState.since
+      result.config = broker.config
 
+      broker.connectors.foreach{ c=>
+        result.connectors.add(c.id)
+      }
 
-        cb(result)
+      broker.virtualHosts.values.foreach{ host=>
+        result.virtualHosts.add( host.id )
       }
+
+
+      cb(Some(result))
     }
   }
 
@@ -74,18 +94,6 @@ case class StatusResource(parent:Broker)
     list.toArray(new Array[jl.Long](list.size))
   }
 
-  private def with_virtual_host[T](id:Long)(func: (VirtualHost, Option[T]=>Unit)=>Unit):T
= {
-    Future[Option[T]] { cb=>
-      broker.virtualHosts.valuesIterator.find( _.id == id) match {
-        case Some(virtualHost)=>
-          virtualHost.dispatchQueue {
-            func(virtualHost, cb)
-          }
-        case None=> cb(None)
-      }
-    }.getOrElse(result(NOT_FOUND))
-  }
-
   @GET @Path("virtual-hosts/{id}")
   def virtualHost(@PathParam("id") id : Long):VirtualHostStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
@@ -130,7 +138,7 @@ case class StatusResource(parent:Broker)
   }
 
   @GET @Path("virtual-hosts/{id}/queues/{queue}")
-  def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long):QueueStatusDTO =
{
+  def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long, @QueryParam("entries")
entries:Boolean ):QueueStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
       import JavaConversions._
       virtualHost.queues.valuesIterator.find { _.id == qid } match {
@@ -155,24 +163,25 @@ case class StatusResource(parent:Broker)
             result.flushingSize = q.flushing_size
             result.flushedItems = q.flushed_items
 
-
-            var cur = q.head_entry
-            while( cur!=null ) {
-
-              val e = new EntryStatusDTO
-              e.seq = cur.seq
-              e.count = cur.count
-              e.size = cur.size
-              e.consumerCount = cur.parked.size
-              e.prefetchCount = cur.prefetched
-              e.state = cur.label
-
-              result.entries.add(e)
-
-              cur = if( cur == q.tail_entry ) {
-                null
-              } else {
-                cur.nextOrTail
+            if( entries ) {
+              var cur = q.head_entry
+              while( cur!=null ) {
+
+                val e = new EntryStatusDTO
+                e.seq = cur.seq
+                e.count = cur.count
+                e.size = cur.size
+                e.consumerCount = cur.parked.size
+                e.prefetchCount = cur.prefetched
+                e.state = cur.label
+
+                result.entries.add(e)
+
+                cur = if( cur == q.tail_entry ) {
+                  null
+                } else {
+                  cur.nextOrTail
+                }
               }
             }
 
@@ -193,27 +202,61 @@ case class StatusResource(parent:Broker)
 
   @GET @Path("connectors/{id}")
   def connector(@PathParam("id") id : Long):ConnectorStatusDTO = {
-
-    Future[Option[ConnectorStatusDTO]] { cb=>
+    with_broker { case (broker, cb) =>
       broker.connectors.find(_.id == id) match {
+        case None=> cb(None)
         case Some(connector)=>
-          connector.dispatchQueue {
-            val result = new ConnectorStatusDTO
-            result.id = connector.id
-            result.state = connector.serviceState.toString
-            result.stateSince = connector.serviceState.since
-            result.config = connector.config
-
-            result.accepted = connector.accept_counter.get
-            connector.connections.keysIterator.foreach { id=>
-              result.connections.add(id)
-            }
-            cb(Some(result))
+
+          val result = new ConnectorStatusDTO
+          result.id = connector.id
+          result.state = connector.serviceState.toString
+          result.stateSince = connector.serviceState.since
+          result.config = connector.config
+
+          result.accepted = connector.accept_counter.get
+          connector.connections.keysIterator.foreach { id=>
+            result.connections.add(id)
           }
-        case None=> cb(None)
+          cb(Some(result))
       }
-    }.getOrElse(result(NOT_FOUND))
+    }
+  }
+
+
+  @GET @Path("connections")
+  def connections :Array[Long] = {
+    val rc:Array[Long] = with_broker { case (broker, cb) =>
+      val rc = ListBuffer[Long]()
+      broker.connectors.foreach { connector=>
+        connector.connections.keys.foreach { id =>
+          rc += id.longValue
+        }
+      }
+      cb(Some(rc.toArray))
+    }
+    rc.sorted
+  }
 
+  @GET @Path("connections/{id}")
+  def connections(@PathParam("id") id : Long):ConnectionStatusDTO = {
+    with_broker { case (broker, cb) =>
+      broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
+        case None => cb(None)
+        case Some(connection:BrokerConnection) =>
+          connection.dispatchQueue {
+            val result = new ConnectionStatusDTO
+            result.id = connection.id
+            result.state = connection.serviceState.toString
+            result.stateSince = connection.serviceState.since
+            result.protocol = connection.protocol
+            result.transport = connection.transport.getTypeId
+            result.remoteAddress = connection.transport.getRemoteAddress
+            result.writeCounter = connection.transport.getWriteCounter
+            result.readCounter = connection.transport.getReadCounter
+            cb(Some(result))
+          }
+      }
+    }
   }
 
 }



Mime
View raw message