activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1347661 [2/2] - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-dto/src/main/resources/org...
Date Thu, 07 Jun 2012 15:04:25 GMT
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
Thu Jun  7 15:04:23 2012
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo.util
 import org.fusesource.hawtdispatch.DispatchQueue
 import org.fusesource.hawtdispatch._
 import collection.mutable.ListBuffer
+import java.util.concurrent.TimeUnit
 
 object BaseService extends Log
 
@@ -82,6 +83,7 @@ trait BaseService extends Service with D
       def do_start = {
         val state = new STARTING()
         _service_state = state
+        _start_transition_counter += 1
         try {
           _start(^ {
             _service_state = new STARTED
@@ -130,6 +132,7 @@ trait BaseService extends Service with D
         case state:STARTED =>
           val state = new STOPPING
           _service_state = state
+          _start_transition_counter += 1
           try {
             _stop(^ {
               _service_state = new STOPPED
@@ -159,6 +162,20 @@ trait BaseService extends Service with D
     stop_task >>: dispatch_queue
   }
 
+  var _start_transition_counter = 0
+  def schedule_reoccurring(time:Long, unit:TimeUnit)(func: =>Unit) = {
+    val counter = _start_transition_counter
+    def schedule:Unit = dispatch_queue.after(time, unit) {
+      if( counter == _start_transition_counter ) {
+        try {
+          func
+        } finally {
+          schedule
+        }
+      }
+    }
+    schedule
+  }
   protected def _start(on_completed:Task)
   protected def _stop(on_completed:Task)
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
Thu Jun  7 15:04:23 2012
@@ -24,10 +24,10 @@ package org.apache.activemq.apollo.util
  */
 object CollectionsSupport {
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
+  def diff[T](prev:scala.collection.Set[T], next:scala.collection.Set[T]) = {
+    val same = prev.intersect(next)
+    val added = next -- same
+    val removed = prev -- next
+    (added, same, removed)
   }
 }
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/StateMachine.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/StateMachine.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/StateMachine.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/StateMachine.scala
Thu Jun  7 15:04:23 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+/**
+ * Used to enforce a single state on an object.  The object can only
+ * transitioned to a new state by the current state of the of the object.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class StateMachine {
+
+  /**
+   * A state of the object.
+   */
+  abstract class State {
+
+    def init() = {}
+
+    /**
+     * Changes to the new state only if it is still the current state.
+     * @param next
+     */
+    final protected def become(next: State) = {
+      if( _state == this ) {
+        _state = next
+        next.init()
+      }
+    }
+
+    /**
+     * Executes the code block only if we are still the current state.
+     * @param func
+     */
+    final def react(func: =>Unit) = {
+      if( _state == this ) {
+        func
+      }
+    }
+  }
+
+  case class Pause(next:State) extends State {
+    def continue = become(next)
+  }
+
+
+  private var _state = init()
+  protected def init():State
+  def state = _state
+
+  def react[T <: State : Manifest](func: (T)=>Unit) = {
+    var m = manifest[T]
+    if( m.erasure == _state.getClass ) {
+      func(_state.asInstanceOf[T])
+    }
+  }
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Thu Jun  7 15:04:23 2012
@@ -479,6 +479,29 @@ class BrokerResource() extends Resource 
     }
   }
 
+  @GET @Path("/virtual-hosts/{id}/load-status")
+  @ApiOperation(value = "Gets a report of the message load on the broker.")
+  @Produces(Array(APPLICATION_JSON))
+  def topics(@PathParam("id") id : String ):LoadStatusDTO = {
+    with_virtual_host[LoadStatusDTO](id) { host =>
+      val router: LocalRouter = host
+      val queue_loads = Future.all {
+        router.local_queue_domain.destination_by_id.values.map { value  =>
+          monitoring[DestinationLoadDTO](value) {
+            value.load_status
+          }
+        }
+      }
+      queue_loads.map { queue_loads=>
+        val rc = new LoadStatusDTO
+        queue_loads.flatMap(_.success_option).foreach { load=>
+          rc.queues.add(load)
+        }
+        Success(rc)
+      }
+    }
+  }
+
   @GET @Path("/virtual-hosts/{id}/topics")
   @ApiOperation(value = "Gets a list of all the topics that exist on the broker.")
   @Produces(Array(APPLICATION_JSON))

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Jun  7 15:04:23 2012
@@ -173,6 +173,7 @@
     <module>apollo-web</module>
     <module>apollo-cli</module>
     <module>apollo-website</module>
+    <module>apollo-network</module>
     <module>apollo-openwire-generator</module>
     <module>apollo-openwire</module>
     <module>apollo-distro</module>



Mime
View raw message