activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1235356 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Tue, 24 Jan 2012 16:58:17 GMT
Author: chirino
Date: Tue Jan 24 16:58:16 2012
New Revision: 1235356

URL: http://svn.apache.org/viewvc?rev=1235356&view=rev
Log:
Implemented more robust session closing log on queues and consumers so that when a queue or
consumer is stopped, any inflight messages on it's session are drained and Nacked.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1235356&r1=1235355&r2=1235356&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Jan 24 16:58:16 2012
@@ -27,7 +27,6 @@ import org.apache.activemq.apollo.util.l
 import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
 import OptionSupport._
 import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}
-import org.fusesource.hawtbuf.Buffer
 import java.lang.UnsupportedOperationException
 import security.SecuredResource._
 import security.{SecuredResource, SecurityContext}
@@ -466,6 +465,11 @@ class Queue(val router: LocalRouter, val
   var stop_listener_waiting_for_flush:Runnable = _
 
   protected def _stop(on_completed: Runnable) = {
+
+    // Now that we are stopping the queue will no longer be 'full'
+    // draining will nack all enqueue attempts.
+    messages.refiller.run
+
     // Disconnect the producers..
     producers.foreach { producer =>
       disconnect(producer)
@@ -520,17 +524,34 @@ class Queue(val router: LocalRouter, val
 
     var refiller: Runnable = null
 
-    def full = (producer_swapped_in.size >= producer_swapped_in.size_max) || is_enqueue_throttled
|| !service_state.is_started || (tune_quota >=0 && queue_size > tune_quota)
+    def is_quota_exceeded = (tune_quota >= 0 && queue_size > tune_quota)
+    def is_enqueue_throttled = (enqueues_remaining!=null && enqueues_remaining.get()
<= 0)
+    def is_enqueue_buffer_maxed = (producer_swapped_in.size >= producer_swapped_in.size_max)
+
+    def full = if( service_state.is_started ) {
+      is_enqueue_buffer_maxed || is_enqueue_throttled || is_quota_exceeded
+    } else if( service_state.is_starting) {
+      true
+    } else {
+      false
+    }
 
     def offer(delivery: Delivery): Boolean = {
       if (full) {
         false
       } else {
 
-        // Don't even enqueue if the message has expired.
+        // Don't even enqueue if the message has expired or the queue has stopped.
         val expiration = delivery.message.expiration
-        if( expiration != 0 && expiration <= now ) {
-          expired(delivery)
+        val expired = expiration != 0 && expiration <= now
+
+        if( !service_state.is_started || expired) {
+          if( delivery.ack!=null ) {
+            delivery.ack(if ( expired ) Expired else Undelivered, delivery.uow)
+          }
+          if( delivery.uow!=null ) {
+            delivery.uow.release()
+          }
           return true
         }
 
@@ -838,7 +859,6 @@ class Queue(val router: LocalRouter, val
   var max_enqueue_rate = Int.MaxValue
   var enqueues_remaining:LongCounter = _
   
-  def is_enqueue_throttled = enqueues_remaining!=null && enqueues_remaining.get()
<= 0
 
   def enqueue_remaining_take(amount:Int) = {
     if(enqueues_remaining!=null) {
@@ -912,12 +932,15 @@ class Queue(val router: LocalRouter, val
       change_producer_capacity( session_max )
     }
 
-    def close = {
-      session_manager.close(downstream)
-      dispatch_queue {
-        change_producer_capacity( -session_max )
-        inbound_sessions -= this
-      }
+    def close = dispatch_queue {
+      session_manager.close(downstream, (delivery)=>{
+        // We have been closed so we have to nak any deliveries.
+        if( delivery.ack!=null ) {
+          delivery.ack(Undelivered, delivery.uow)
+        }
+      })
+      change_producer_capacity( -session_max )
+      inbound_sessions -= this
       release
     }
 
@@ -1428,14 +1451,16 @@ class QueueEntry(val queue:Queue, val se
     override  def as_loaded = this
 
     def store = {
-      if(!storing) {
+      // We should no longer be storing stuff if stopped.
+      assert(queue.service_state.is_starting_or_started)
+      if(!stored && !storing) {
         storing = true
         delivery.uow.enqueue(toQueueEntryRecord)
         queue.swapping_out_size+=size
         delivery.uow.on_flush { canceled =>
           queue.swap_out_completes_source.merge(^{
-            queue.swapping_out_size-=size
             this.swapped_out(!canceled)
+            queue.swapping_out_size-=size
             if( queue.swapping_out_size==0 ) {
               queue.on_queue_flushed
             }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1235356&r1=1235355&r2=1235356&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Tue Jan 24 16:58:16 2012
@@ -193,17 +193,19 @@ class SinkMux[T](val downstream:Sink[T])
 
   class ManagedSink extends Sink[T] {
 
-    var closed = false
+    var rejection_handler:(T)=>Unit = _
     var refiller:Runnable = NOOP
 
     def offer(value: T) = {
-      if ( closed ) {
-        true
+      if ( full ) {
+        false
       } else {
-        downstream.offer(value)
+        val accepted = downstream.offer(value)
+        assert(accepted)
+        true
       }
     }
-    def full = !closed && downstream.full
+    def full = downstream.full && rejection_handler==null
   }
   
   def open():Sink[T] = {
@@ -212,14 +214,15 @@ class SinkMux[T](val downstream:Sink[T])
     sink
   }
 
-  def close(sink:Sink[T]):Unit = {
+  def close(sink:Sink[T], rejection_handler:(T)=>Unit):Unit = {
     sink match {
       case sink:ManagedSink =>
-        sink.closed = true
+        assert(sink.rejection_handler==null)
+        sink.rejection_handler = rejection_handler
         sink.refiller.run()
         sinks -= sink
       case _ =>
-        error("We did not open that sink")
+        sys.error("We did not open that sink")
     }
   }
 }
@@ -349,13 +352,13 @@ class SessionSinkMux[T](val downstream:S
     session
   }
 
-  def close(session:Sink[T]):Unit = {
+  def close(session:Sink[T], rejection_handler:(T)=>Unit):Unit = {
     consumer_queue <<| ^{
       session match {
         case s:Session[T] =>
           sessions -= s
           s.producer_queue {
-            s.close
+            s.close(rejection_handler)
           }
       }
     }
@@ -388,18 +391,13 @@ class Session[T](val producer_queue:Disp
   }
   credit_adder.resume
 
-  private var closed = false
-  private var _full = credits <= 0
-
+  private var rejection_handler: (T)=>Unit = _
+  
   private def add_credits(value:Int) = {
+    val was_full = _full
     credits += value;
-    if( closed || credits <= 0 ) {
-      _full = true
-    } else if( credits >= 0 ) {
-      if( _full ) {
-        _full  = false
-        refiller.run
-      }
+    if( was_full && !_full ) {
+      refiller.run
     }
   }
 
@@ -414,29 +412,35 @@ class Session[T](val producer_queue:Disp
     producer_queue.assertExecuting()
     _full
   }
+  
+  def _full = credits <= 0 && rejection_handler == null
 
   override def offer(value: T) = {
     producer_queue.assertExecuting()
-    if( _full || closed ) {
+    if( _full ) {
       false
     } else {
-      val size = sizer.size(value)
-
-      enqueue_item_counter += 1
-      enqueue_size_counter += size
-      enqueue_ts = mux.time_stamp
-
-      add_credits(-size)
-      downstream.merge((this, value))
+      if( rejection_handler!=null ) {
+        rejection_handler(value)
+      } else {
+        val size = sizer.size(value)
+  
+        enqueue_item_counter += 1
+        enqueue_size_counter += size
+        enqueue_ts = mux.time_stamp
+  
+        add_credits(-size)
+        downstream.merge((this, value))
+      }
       true
     }
   }
 
-  def close = {
+  def close(rejection_handler:(T)=>Unit) = {
     producer_queue.assertExecuting()
-    if( !closed ) {
-      closed=true
-    }
+    assert(this.rejection_handler==null)
+    this.rejection_handler=rejection_handler
+    refiller.run
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1235356&r1=1235355&r2=1235356&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Tue Jan 24 16:58:16 2012
@@ -798,7 +798,10 @@ class OpenwireProtocolHandler extends Pr
     override def dispose() = dispatchQueue {
       ack_handler.close
       super.dispose()
-      sink_manager.close(consumer_sink)
+      sink_manager.close(consumer_sink,(frame)=>{
+        debug("Got command after consumer was disposed.");
+      })
+
     }
     
     
@@ -931,7 +934,12 @@ class OpenwireProtocolHandler extends Pr
       }
 
       def dispose = {
-        session_manager.close(downstream)
+        session_manager.close(downstream,(delivery)=>{
+          // We have been closed so we have to nak any deliveries.
+          if( delivery.ack!=null ) {
+            delivery.ack(Undelivered, delivery.uow)
+          }
+        })
         if( info.getDestination.isTemporary ) {
           reset {
             val rc = host.router.delete(destination, security_context)
@@ -1149,9 +1157,7 @@ class OpenwireProtocolHandler extends Pr
       }
 
       if( uow!=null ) {
-        uow.on_complete(^{
-          onComplete
-        })
+        uow.on_complete(onComplete)
         uow.release
       } else {
         onComplete

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1235356&r1=1235355&r2=1235356&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Tue Jan 24 16:58:16 2012
@@ -425,7 +425,9 @@ class StompProtocolHandler extends Proto
     override def dispose() = dispatchQueue {
       ack_handler.close
       credit_window_filter.disable
-      sink_manager.close(consumer_sink)
+      sink_manager.close(consumer_sink, (frame)=>{
+        debug("Got frame after consumer was disposed.");
+      })
       super.dispose()
     }
 
@@ -512,7 +514,12 @@ class StompProtocolHandler extends Proto
       }
 
       def dispose = {
-        session_manager.close(downstream)
+        session_manager.close(downstream, (delivery)=>{
+          // We have been closed so we have to nak any deliveries.
+          if( delivery.ack!=null ) {
+            delivery.ack(Undelivered, delivery.uow)
+          }
+        })
         release
       }
 



Mime
View raw message