activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1443061 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/ ...
Date Wed, 06 Feb 2013 16:46:27 GMT
Author: chirino
Date: Wed Feb  6 16:46:26 2013
New Revision: 1443061

URL: http://svn.apache.org/viewvc?rev=1443061&view=rev
Log:
We now make sure in flight messages sent by STOMP producers are accepted by the broker before
disconnecting the stomp connection.  This should improve flow control situations were producer
send and disconnect over and over.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1443061&r1=1443060&r2=1443061&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Feb  6 16:46:26 2013
@@ -737,6 +737,10 @@ class Queue(val router: LocalRouter, val
           }
         }
 
+        if( delivery.ack!=null ) {
+          delivery.ack(Consumed, queue_delivery.uow)
+        }
+
         // release the store batch...
         if (persisted) {
           queue_delivery.uow.release

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1443061&r1=1443060&r2=1443061&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Wed Feb  6 16:46:26 2013
@@ -201,7 +201,7 @@ object DeliveryProducerRoute extends Log
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class DeliveryProducerRoute(router:Router) extends Sink[Delivery] with BindableDeliveryProducer
{
+abstract class DeliveryProducerRoute(router:Router) extends Sink[Delivery] with BindableDeliveryProducer
with DeferringDispatched {
   import DeliveryProducerRoute._
 
   var last_send = Broker.now
@@ -217,7 +217,7 @@ abstract class DeliveryProducerRoute(rou
     null
   }
 
-  def connected() = dispatch_queue {
+  def connected() = defer {
     on_connected
   }
 
@@ -235,7 +235,7 @@ abstract class DeliveryProducerRoute(rou
 
   def connect(x:DeliveryConsumer) = x.connect(this)
 
-  def unbind(targets:List[DeliveryConsumer]) = dispatch_queue {
+  def unbind(targets:List[DeliveryConsumer]) = defer {
     this.targets = this.targets.filterNot { x=>
       val rc = targets.contains(x.consumer)
       if( rc ) {
@@ -253,7 +253,7 @@ abstract class DeliveryProducerRoute(rou
     targets.foreach(_.release)
   }
 
-  def disconnected() = dispatch_queue {
+  def disconnected() = defer {
     this.targets.foreach { x=>
       debug("producer route detaching from consumer.")
       x.close
@@ -270,7 +270,6 @@ abstract class DeliveryProducerRoute(rou
   // Dispatch.
   //
 
-  var pendingAck: (DeliveryResult, StoreUOW)=>Unit = null
   var overflow:Delivery=null
   var overflowSessions = List[DeliverySession]()
   var refiller:Task=null
@@ -283,10 +282,32 @@ abstract class DeliveryProducerRoute(rou
       false
     } else {
       last_send = Broker.now
+
       // Do we need to store the message if we have a matching consumer?
-      pendingAck = delivery.ack
+      var matching_targets = 0
+      val original_ack = delivery.ack
       val copy = delivery.copy
-      
+
+        if ( original_ack!=null ) {
+        copy.ack = (result, uow)=> {
+          defer {
+            matching_targets -= 1
+            if ( matching_targets<= 0 && copy.ack!=null ) {
+              copy.ack = null
+              if (delivery.uow != null) {
+                delivery.uow.on_complete {
+                  defer {
+                    original_ack(Consumed, null)
+                  }
+                }
+              } else {
+                original_ack(Consumed, null)
+              }
+            }
+          }
+        }
+      }
+
       if(copy.message!=null) {
         copy.message.retain
       }
@@ -295,7 +316,7 @@ abstract class DeliveryProducerRoute(rou
 
         // only deliver to matching consumers
         if( target.consumer.matches(copy) ) {
-
+          matching_targets += 1
           if ( target.consumer.is_persistent && copy.persistent && store
!= null) {
 
             if (copy.uow == null) {
@@ -314,29 +335,21 @@ abstract class DeliveryProducerRoute(rou
         }
       }
 
+      if ( matching_targets == 0 && original_ack!=null ) {
+        original_ack(Consumed, null)
+      }
+
       if( overflowSessions!=Nil ) {
         overflow = copy
       } else {
-        delivered(copy)
+        release(copy)
       }
       true
     }
   }
 
 
-  private def delivered(delivery: Delivery): Unit = {
-    if (pendingAck != null) {
-      if (delivery.uow != null) {
-        val ack = pendingAck
-        delivery.uow.on_complete {
-          ack(Consumed, null)
-        }
-
-      } else {
-        pendingAck(Consumed, null)
-      }
-      pendingAck==null
-    }
+  private def release(delivery: Delivery): Unit = {
     if (delivery.uow != null) {
       delivery.uow.release
     }
@@ -355,7 +368,7 @@ abstract class DeliveryProducerRoute(rou
         }
       }
       if( overflowSessions==Nil ) {
-        delivered(overflow)
+        release(overflow)
         overflow = null
         if(refiller!=null)
           refiller.run

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1443061&r1=1443060&r2=1443061&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Wed Feb  6 16:46:26 2013
@@ -103,6 +103,9 @@ class Topic(val router:LocalRouter, val 
           retained_message = null;
         case _ =>
       }
+      if( value.ack != null ) {
+        value.ack(Consumed, value.uow)
+      }
       true
     }
 
@@ -149,12 +152,24 @@ class Topic(val router:LocalRouter, val 
     def producer = session.producer
     def consumer = session.consumer
 
+    val ack_pass_through = proxy.link.kind == "dsub"
+
     def offer(value: Delivery) = {
       val copy = value.copy();
-      copy.uow = value.uow
-      copy.ack = value.ack
       copy.sender ::= address
-      downstream.offer(copy)
+      if ( ack_pass_through ) {
+        copy.ack = value.ack
+        copy.uow = value.uow
+      }
+
+      val accepted = downstream.offer(copy)
+
+      // If we don't ack now, then the sender's ack will
+      // wait for the consumers ack which might be a nice option to give folks.
+      if( accepted && !ack_pass_through && value.ack!=null ) {
+        value.ack(Consumed, value.uow)
+      }
+      accepted
     }
   }
 
@@ -399,10 +414,9 @@ class Topic(val router:LocalRouter, val 
       if (r != null) {
         val copy = r.copy()
         copy.sender ::= address
-
         val producer = new  DeliveryProducerRoute(router) {
           refiller = NOOP
-          val dispatch_queue = createQueue()
+          def dispatch_queue = Topic.this.dispatch_queue
           override protected def on_connected = {
             copy.ack = (d,x) => consumer.dispatch_queue {
               unbind(consumer :: Nil)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1443061&r1=1443060&r2=1443061&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Feb  6 16:46:26 2013
@@ -814,9 +814,7 @@ class StompProtocolHandler extends Proto
 
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
-      queue.after(die_delay, TimeUnit.MILLISECONDS) {
-        connection.stop(NOOP)
-      }
+      disconnect(true)
     }
     throw new Break()
   }
@@ -848,11 +846,15 @@ class StompProtocolHandler extends Proto
     if( !closed ) {
       suspend_read("shutdown")
       connection_log.info("Shutting connection '%s'  down due to: %s", security_context.remote_address,
error)
+      disconnect(false)
     }
-    connection.stop(NOOP)
   }
 
-  override def on_transport_disconnected() = {
+  override def on_transport_disconnected {
+    disconnect(false)
+  }
+
+  def disconnect(delay: =>Boolean) = {
     if( !closed ) {
       heart_beat_monitor.stop
       closed=true;
@@ -866,12 +868,12 @@ class StompProtocolHandler extends Proto
       }
       transactions.clear()
 
-      producerRoutes.values().foreach{ route=>
+      producer_routes.values().foreach{ route=>
         host.dispatch_queue {
           host.router.disconnect(route.addresses, route)
         }
       }
-      producerRoutes.clear
+      producer_routes.clear
       consumers.foreach { case (_,consumer)=>
         val addresses = consumer.addresses
         host.dispatch_queue {
@@ -886,6 +888,15 @@ class StompProtocolHandler extends Proto
         }
       })
       trace("stomp protocol resources released")
+      on_routing_empty {
+        if( delay ) {
+          queue.after(die_delay, TimeUnit.MILLISECONDS) {
+            connection.stop(NOOP)
+          }
+        } else {
+          connection.stop(NOOP)
+        }
+      }
     }
   }
 
@@ -946,21 +957,8 @@ class StompProtocolHandler extends Proto
                 on_stomp_unsubscribe(frame.headers)
               case NACK =>
                 on_stomp_nack(frame)
-
               case DISCONNECT =>
-
-                val delay = send_receipt(frame.headers)!=null
-                on_transport_disconnected
-                if( delay ) {
-                  queue.after(die_delay, TimeUnit.MILLISECONDS) {
-                    connection.stop(NOOP)
-                  }
-                } else {
-                  // no point in delaying the connection shutdown
-                  // if the client does not want a receipt..
-                  connection.stop(NOOP)
-                }
-
+                disconnect(send_receipt(frame.headers)!=null)
               case _ =>
                 die("Invalid STOMP frame command: "+frame.action);
             }
@@ -1141,6 +1139,16 @@ class StompProtocolHandler extends Proto
     }
   }
 
+  var routing_size = 0L
+  var pending_routing_empty_callbacks = ListBuffer[()=>Unit]()
+  def on_routing_empty(func: => Unit) = {
+    if( routing_size== 0 ) {
+      func
+    } else {
+      pending_routing_empty_callbacks.append( func _ )
+    }
+  }
+
   class StompProducerRoute(val dest: AsciiBuffer) extends DeliveryProducerRoute(host.router)
{
     val addresses = decode_addresses(dest)
     val key = addresses.toList
@@ -1151,18 +1159,43 @@ class StompProtocolHandler extends Proto
 
     override def dispatch_queue = queue
 
+
     refiller = ^ {
       resume_read
     }
+
+
+    override def offer(delivery: Delivery): Boolean = {
+      if( full )
+        return false
+      routing_size += delivery.size
+      val original_ack = delivery.ack
+      delivery.ack = (result, uow) => {
+        dispatch_queue.assertExecuting()
+        if ( original_ack!=null ) {
+          original_ack(result, uow)
+        }
+        routing_size -= delivery.size
+        if( routing_size==0 && !pending_routing_empty_callbacks.isEmpty) {
+          val t = pending_routing_empty_callbacks
+          pending_routing_empty_callbacks = ListBuffer()
+          for ( func <- t ) {
+            func()
+          }
+        }
+      }
+      super.offer(delivery)
+    }
   }
 
+
   var maintenance_scheduled = false
   def schedule_maintenance:Unit = {
-    if(!maintenance_scheduled && !producerRoutes.isEmpty) {
+    if(!maintenance_scheduled && !producer_routes.isEmpty) {
       maintenance_scheduled = true
       dispatchQueue.after(2, TimeUnit.SECONDS) {
         maintenance_scheduled = false
-        if( !producerRoutes.isEmpty ) {
+        if( !producer_routes.isEmpty ) {
           try {
             producer_maintenance
           } finally {
@@ -1177,20 +1210,20 @@ class StompProtocolHandler extends Proto
     val now = Broker.now
     import collection.JavaConversions._
     val expired = ListBuffer[StompProducerRoute]()
-    for( route <- producerRoutes.values() ) {
+    for( route <- producer_routes.values() ) {
       if( (now - route.last_send) > 2000 ) {
         expired += route
       }
     }
     for( route <- expired ) {
-      producerRoutes.remove(route.dest)
+      producer_routes.remove(route.dest)
       host.dispatch_queue {
         host.router.disconnect(route.addresses, route)
       }
     }
   }
 
-  var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
+  var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
     override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
       host.dispatch_queue {
         host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
@@ -1200,7 +1233,7 @@ class StompProtocolHandler extends Proto
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
     val dest = get(frame.headers, DESTINATION).get
-    producerRoutes.get(dest) match {
+    producer_routes.get(dest) match {
       case null =>
         // Deep copy to avoid holding onto a 64k buffer
         val trimmed_dest = dest.deepCopy().ascii()
@@ -1216,7 +1249,7 @@ class StompProtocolHandler extends Proto
               case None =>
                 if (!connection.stopped) {
                   resume_read
-                  producerRoutes.put(trimmed_dest, route)
+                  producer_routes.put(trimmed_dest, route)
                   schedule_maintenance
                   send_via_route(route.addresses, route, frame, uow)
                 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1443061&r1=1443060&r2=1443061&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Wed Feb  6 16:46:26 2013
@@ -1618,4 +1618,34 @@ class StompParallelTest extends StompTes
 
   }
 
+
+  test("STOMP flow control.") {
+    val dest = next_id("/queue/quota.flow_control")
+
+    // start a sub client /w a small credit window.
+    val sub = connect("1.1")
+    subscribe("my-sub-name", dest, "client", headers="credit:1,0\n")
+
+    val sent = new AtomicInteger(0)
+    val body = "x"*1024*10
+    Broker.BLOCKABLE_THREAD_POOL {
+      for( i <- 1 to 10 ) {
+        println("sending: "+i)
+        val client = connect("1.1", new StompClient)
+        async_send(dest, body, c=client)
+        disconnect(client)
+        sent.incrementAndGet()
+      }
+    }
+
+    for( i <- 1 to 10 ) {
+      within(1, SECONDS) {
+        sent.get should be(i)
+      }
+      assert_received(body)(true)
+      Thread.sleep(200)
+    }
+
+  }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1443061&r1=1443060&r2=1443061&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Wed Feb  6 16:46:26 2013
@@ -27,7 +27,20 @@ import org.fusesource.hawtdispatch._
  */
 trait Dispatched {
   def dispatch_queue:DispatchQueue
-
   def assert_executing = dispatch_queue.assertExecuting()
+}
+
+trait DeferringDispatched extends Dispatched {
+
+  def defer(func: =>Unit) = {
+    dispatch_queue_task_source.merge(new Task(){
+      def run() {
+        func
+      }
+    })
+  }
 
+  val dispatch_queue_task_source = createSource(new ListEventAggregator[Task](), dispatch_queue)
+  dispatch_queue_task_source.setEventHandler(^{ dispatch_queue_task_source.getData.foreach(_.run())
});
+  dispatch_queue_task_source.resume()
 }
\ No newline at end of file



Mime
View raw message