activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233706 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Fri, 20 Jan 2012 00:38:41 GMT
Author: chirino
Date: Fri Jan 20 00:38:41 2012
New Revision: 1233706

URL: http://svn.apache.org/viewvc?rev=1233706&view=rev
Log:
Fixes issue where message could get 'stuck' on the queue.  Also make sure that the msg seq
field is set on the Delivery object when it's get reloaded from the DB.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1233706&r1=1233705&r2=1233706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Jan 20 00:38:41 2012
@@ -1647,6 +1647,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swapping_in_size -= size
 
         val delivery = new Delivery()
+        delivery.seq = seq
         delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1233706&r1=1233705&r2=1233706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Fri Jan 20 00:38:41 2012
@@ -191,18 +191,36 @@ class SinkMux[T](val downstream:Sink[T])
     }
   }
 
-  def open():Sink[T] = {
-    val sink = new Sink[T] {
-      var refiller:Runnable = NOOP
-      def offer(value: T) = downstream.offer(value)
-      def full = downstream.full
+  class ManagedSink extends Sink[T] {
+
+    var closed = false
+    var refiller:Runnable = NOOP
+
+    def offer(value: T) = {
+      if ( closed ) {
+        true
+      } else {
+        downstream.offer(value)
+      }
     }
+    def full = !closed && downstream.full
+  }
+  
+  def open():Sink[T] = {
+    val sink = new ManagedSink()
     sinks += sink
     sink
   }
 
   def close(sink:Sink[T]):Unit = {
-    sinks -= sink
+    sink match {
+      case sink:ManagedSink =>
+        sink.closed = true
+        sink.refiller.run()
+        sinks -= sink
+      case _ =>
+        error("We did not open that sink")
+    }
   }
 }
 
@@ -210,8 +228,14 @@ class CreditWindowFilter[T](val downstre
 
   var byte_credits = 0
   var delivery_credits = 0
+  var disabled = true
 
-  override def full: Boolean = downstream.full || ( byte_credits <= 0 && delivery_credits
<= 0 )
+  override def full: Boolean = downstream.full || ( disabled && byte_credits <=
0 && delivery_credits <= 0 )
+
+  def disable = {
+    disabled = false
+    refiller.run()
+  }
 
   def passing(value: T) = {
     byte_credits -= sizer.size(value)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1233706&r1=1233705&r2=1233706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Jan 20 00:38:41 2012
@@ -415,8 +415,9 @@ class StompProtocolHandler extends Proto
 
     override def dispose() = dispatchQueue {
       ack_handler.close
-      super.dispose()
+      credit_window_filter.disable
       sink_manager.close(consumer_sink)
+      super.dispose()
     }
 
     def dispatch_queue = StompProtocolHandler.this.dispatchQueue



Mime
View raw message