activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1213481 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/...
Date Mon, 12 Dec 2011 22:49:17 GMT
Author: chirino
Date: Mon Dec 12 22:49:17 2011
New Revision: 1213481

URL: http://svn.apache.org/viewvc?rev=1213481&view=rev
Log:
Fix for APLO-88: ACK followed by DISCONNECT Leaves Message Available

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1213481&r1=1213480&r2=1213481&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Dec 12 22:49:17 2011
@@ -1822,14 +1822,14 @@ class Subscription(val queue:Queue, val 
 
   var total_ack_count = 0L
   var total_nack_count = 0L
-
+  
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
     "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
   }
 
-  def browser = session.consumer.browser
-  def exclusive = session.consumer.exclusive
+  def browser = consumer.browser
+  def exclusive = consumer.exclusive
 
   // This opens up the consumer
   def open() = {
@@ -1869,6 +1869,17 @@ class Subscription(val queue:Queue, val 
     queue.check_idle
   }
 
+  var pending_close_action: ()=>Unit = _
+  
+  def check_finish_close = {
+    // We can complete the closing of the sub
+    // once the outstanding acks are settled.
+    if (pending_close_action!=null && acquired.isEmpty) {
+      pending_close_action()
+      pending_close_action = null
+    }
+  }
+
   def close() = {
     if(pos!=null) {
       pos -= this
@@ -1876,30 +1887,27 @@ class Subscription(val queue:Queue, val 
 
       queue.exclusive_subscriptions = queue.exclusive_subscriptions.filterNot( _ == this
)
       queue.all_subscriptions -= consumer
-      queue.change_consumer_capacity( - queue.tune_consumer_buffer )
-
-
-      // nack all the acquired entries.
-      var next = acquired.getHead
-      while( next !=null ) {
-        val cur = next;
-        next = next.getNext
-        cur.entry.redelivered
-        cur.nack // this unlinks the entry.
-      }
-
-      if( exclusive ) {
-        // rewind all the subs to the start of the queue.
-        queue.all_subscriptions.values.foreach(_.rewind(queue.head_entry))
-      }
 
       session.refiller = NOOP
       session.close
       session = null
-      consumer.release
 
-      queue.check_idle
-      queue.trigger_swap
+      // The following action gets executed once all aquired messages
+      // ared acked or nacked.
+      pending_close_action = ()=> {
+        queue.change_consumer_capacity( - queue.tune_consumer_buffer )
+
+        if( exclusive ) {
+          // rewind all the subs to the start of the queue.
+          queue.all_subscriptions.values.foreach(_.rewind(queue.head_entry))
+        }
+  
+        queue.check_idle
+        queue.trigger_swap
+      }
+
+      consumer.release
+      check_finish_close
     } else {}
   }
 
@@ -1912,7 +1920,7 @@ class Subscription(val queue:Queue, val 
     pos = value
     check_load_stall
     if( tail_parked ) {
-        if(session.consumer.close_on_drain) {
+        if(consumer.close_on_drain) {
           close
         }
     }
@@ -1933,7 +1941,7 @@ class Subscription(val queue:Queue, val 
 
   def tail_parked = pos eq queue.tail_entry
 
-  def matches(entry:Delivery) = session.consumer.matches(entry)
+  def matches(entry:Delivery) = consumer.matches(entry)
   def full = session.full
 
   def offer(delivery:Delivery) = try {
@@ -2032,10 +2040,7 @@ class Subscription(val queue:Queue, val 
         debug("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
         return
       }
-      // The session may have already been closed..
-      if( session == null ) {
-        return;
-      }
+
       total_ack_count += 1
       if (entry.messageKey != -1) {
         val storeBatch = if( uow == null ) {
@@ -2063,6 +2068,8 @@ class Subscription(val queue:Queue, val 
 
       queue.trigger_swap
       next.run
+      check_finish_close
+      
     }
 
     def nack:Unit = {
@@ -2071,10 +2078,6 @@ class Subscription(val queue:Queue, val 
         warn("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
         return
       }
-      // The session may have already been closed..
-      if( session == null ) {
-        return;
-      }
 
       total_nack_count += 1
       entry.as_loaded.acquired = false
@@ -2098,6 +2101,7 @@ class Subscription(val queue:Queue, val 
 
       }
       unlink()
+      check_finish_close
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1213481&r1=1213480&r2=1213481&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Mon Dec 12 22:49:17 2011
@@ -795,6 +795,13 @@ class OpenwireProtocolHandler extends Pr
       override def time_stamp = broker.now
     }
 
+    override def dispose() = dispatchQueue {
+      ack_handler.close
+      super.dispose()
+      sink_manager.close(consumer_sink)
+    }
+    
+    
     override def exclusive = info.isExclusive
     override def browser = info.isBrowser
 
@@ -969,9 +976,26 @@ class OpenwireProtocolHandler extends Pr
       // TODO: Need to validate all the range ack cases...
       var consumer_acks = ListBuffer[(MessageId,TrackedAck)]()
 
+      def close = {
+        queue.assertExecuting()
+        consumer_acks.foreach { case(_, tack) =>
+          if( tack.ack !=null ) {
+            tack.ack(Delivered, null)
+          }
+        }
+        consumer_acks = null
+      }
+
       def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit) = {
         queue.assertExecuting()
-        consumer_acks += msgid -> new TrackedAck(ack)
+        if( consumer_acks==null ) {
+          // It can happen if we get closed.. but destination is still sending data..
+          if( ack!=null ) {
+            ack(Undelivered, null)
+          }
+        } else {
+          consumer_acks += msgid -> new TrackedAck(ack)
+        }
       }
 
       def credit(messageAck: MessageAck):Unit = {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1213481&r1=1213480&r2=1213481&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Dec 12 22:49:17 2011
@@ -173,7 +173,7 @@ class StompProtocolHandler extends Proto
       starting_seq=seq
     }
 
-    val ack_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
+    val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
       def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
         if( previous == null ) {
           event
@@ -184,25 +184,35 @@ class StompProtocolHandler extends Proto
       def mergeEvents(previous:(Int, Int), events:(Int, Int)) = mergeEvent(previous, events)
     }, dispatch_queue)
 
-    ack_source.setEventHandler(^ {
-      val data = ack_source.getData
+    credit_window_source.setEventHandler(^ {
+      val data = credit_window_source.getData
       credit_window_filter.credit(data._1, data._2)
     });
-    ack_source.resume
+    credit_window_source.resume
 
     trait AckHandler {
       def track(delivery:Delivery):Unit
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
+      def close:Unit
     }
 
     class AutoAckHandler extends AckHandler {
+      var closed = false
+
+      def close = { closed  = true}
 
       def track(delivery:Delivery) = {
-        if( delivery.ack!=null ) {
-          delivery.ack(Consumed, null)
+        if( closed ) {
+          if( delivery.ack!=null ) {
+            delivery.ack(Undelivered, null)
+          }
+        } else {
+          if( delivery.ack!=null ) {
+            delivery.ack(Consumed, null)
+          }
+          credit_window_source.merge((delivery.size, 1))
         }
-        ack_source.merge((delivery.size, 1))
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
@@ -219,13 +229,30 @@ class StompProtocolHandler extends Proto
     class SessionAckHandler extends AckHandler{
       var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
 
+      def close = {
+        queue.assertExecuting()
+        consumer_acks.foreach { case(_, tack) =>
+          if( tack.ack !=null ) {
+            tack.ack(Delivered, null)
+          }
+        }
+        consumer_acks = null
+      }
+
       def track(delivery:Delivery) = {
         queue.assertExecuting()
-        if( protocol_version eq V1_0 ) {
-          // register on the connection since 1.0 acks may not include the subscription id
-          connection_ack_handlers += ( delivery.message.id-> this )
+        if( consumer_acks == null ) {
+          // It can happen if we get closed.. but destination is still sending data..
+          if( delivery.ack!=null ) {
+            delivery.ack(Undelivered, null)
+          }
+        } else {
+          if( protocol_version eq V1_0 ) {
+            // register on the connection since 1.0 acks may not include the subscription
id
+            connection_ack_handlers += ( delivery.message.id-> this )
+          }
+          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size),
delivery.ack )
         }
-        consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack
)
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
@@ -243,19 +270,20 @@ class StompProtocolHandler extends Proto
 
           for( (id, delivery) <- acked ) {
             for( credit <- delivery.credit ) {
-              ack_source.merge((credit, 1))
+              credit_window_source.merge((credit, 1))
               delivery.credit = None
             }
           }
         } else {
           if( credit_value!=null ) {
-            ack_source.merge((credit_value._1, credit_value._2))
+            credit_window_source.merge((credit_value._1, credit_value._2))
           }
         }
       }
 
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
         queue.assertExecuting()
+        assert(consumer_acks !=null)
 
         // session acks ack all previously received messages..
         var found = false
@@ -289,13 +317,30 @@ class StompProtocolHandler extends Proto
     class MessageAckHandler extends AckHandler {
       var consumer_acks = HashMap[AsciiBuffer, TrackedAck]()
 
+      def close = {
+        queue.assertExecuting()
+        consumer_acks.foreach { case(_, tack) =>
+          if( tack.ack !=null ) {
+            tack.ack(Delivered, null)
+          }
+        }
+        consumer_acks = null
+      }
+
       def track(delivery:Delivery) = {
         queue.assertExecuting();
-        if( protocol_version eq V1_0 ) {
-          // register on the connection since 1.0 acks may not include the subscription id
-          connection_ack_handlers += ( delivery.message.id-> this )
+        if( consumer_acks == null ) {
+          // It can happen if we get closed.. but destination is still sending data..
+          if( delivery.ack!=null ) {
+            delivery.ack(Undelivered, null)
+          }
+        } else {
+          if( protocol_version eq V1_0 ) {
+            // register on the connection since 1.0 acks may not include the subscription
id
+            connection_ack_handlers += ( delivery.message.id-> this )
+          }
+          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size),
delivery.ack)
         }
-        consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
@@ -303,19 +348,20 @@ class StompProtocolHandler extends Proto
         if( initial_credit_window._3 ) {
           for( delivery <- consumer_acks.get(msgid)) {
             for( credit <- delivery.credit ) {
-              ack_source.merge((credit,1))
+              credit_window_source.merge((credit,1))
               delivery.credit = None
             }
           }
         } else {
           if( credit_value!=null ) {
-            ack_source.merge((credit_value._1, credit_value._2))
+            credit_window_source.merge((credit_value._1, credit_value._2))
           }
         }
       }
 
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
         queue.assertExecuting()
+        assert(consumer_acks !=null)
         consumer_acks.remove(msgid) match {
           case Some(delivery) =>
             if( delivery.ack!=null ) {
@@ -364,6 +410,7 @@ class StompProtocolHandler extends Proto
     }
 
     override def dispose() = dispatchQueue {
+      ack_handler.close
       super.dispose()
       sink_manager.close(consumer_sink)
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1213481&r1=1213480&r2=1213481&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Dec 12 22:49:17 2011
@@ -251,6 +251,59 @@ class Stomp11HeartBeatTest extends Stomp
 
 class StompDestinationTest extends StompTestSupport {
 
+  // This is the test case for https://issues.apache.org/jira/browse/APLO-88
+  test("ACK then socket close with/without DISCONNECT, should still ACK") {
+    for(i <- 1 until 3) {
+      connect("1.1")
+
+      def send(id:Int) = {
+        client.write(
+          "SEND\n" +
+          "destination:/queue/from-seq-end\n" +
+          "message-id:id-"+i+"-"+id+"\n"+
+          "receipt:0\n"+
+          "\n")
+        wait_for_receipt("0")
+      }
+
+      def get(seq:Long) = {
+        val frame = client.receive()
+        frame should startWith("MESSAGE\n")
+        frame should include("message-id:id-"+i+"-"+seq+"\n")
+        client.write(
+          "ACK\n" +
+          "subscription:0\n" +
+          "message-id:id-"+i+"-"+seq+"\n" +
+          "\n")
+      }
+
+      send(1)
+      send(2)
+
+      client.write(
+        "SUBSCRIBE\n" +
+        "destination:/queue/from-seq-end\n" +
+        "id:0\n" +
+        "ack:client\n"+
+        "\n")
+      get(1)
+      client.write(
+        "DISCONNECT\n" +
+        "\n")
+      client.close
+
+      connect("1.1")
+      client.write(
+        "SUBSCRIBE\n" +
+        "destination:/queue/from-seq-end\n" +
+        "id:0\n" +
+        "ack:client\n"+
+        "\n")
+      get(2)
+      client.close
+    }
+  }
+
   test("Setting `from-seq` header to -1 results in subscription starting at end of the queue.")
{
     connect("1.1")
 



Mime
View raw message