Author: chirino
Date: Fri Jan 20 00:38:41 2012
New Revision: 1233706
URL: http://svn.apache.org/viewvc?rev=1233706&view=rev
Log:
Fixes issue where message could get 'stuck' on the queue. Also make sure that the msg seq
field is set on the Delivery object when it's get reloaded from the DB.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1233706&r1=1233705&r2=1233706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Jan 20 00:38:41 2012
@@ -1647,6 +1647,7 @@ class QueueEntry(val queue:Queue, val se
queue.swapping_in_size -= size
val delivery = new Delivery()
+ delivery.seq = seq
delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
delivery.size = messageRecord.size
delivery.storeKey = messageRecord.key
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1233706&r1=1233705&r2=1233706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Fri Jan 20 00:38:41 2012
@@ -191,18 +191,36 @@ class SinkMux[T](val downstream:Sink[T])
}
}
- def open():Sink[T] = {
- val sink = new Sink[T] {
- var refiller:Runnable = NOOP
- def offer(value: T) = downstream.offer(value)
- def full = downstream.full
+ class ManagedSink extends Sink[T] {
+
+ var closed = false
+ var refiller:Runnable = NOOP
+
+ def offer(value: T) = {
+ if ( closed ) {
+ true
+ } else {
+ downstream.offer(value)
+ }
}
+ def full = !closed && downstream.full
+ }
+
+ def open():Sink[T] = {
+ val sink = new ManagedSink()
sinks += sink
sink
}
def close(sink:Sink[T]):Unit = {
- sinks -= sink
+ sink match {
+ case sink:ManagedSink =>
+ sink.closed = true
+ sink.refiller.run()
+ sinks -= sink
+ case _ =>
+ error("We did not open that sink")
+ }
}
}
@@ -210,8 +228,14 @@ class CreditWindowFilter[T](val downstre
var byte_credits = 0
var delivery_credits = 0
+ var disabled = true
- override def full: Boolean = downstream.full || ( byte_credits <= 0 && delivery_credits
<= 0 )
+ override def full: Boolean = downstream.full || ( disabled && byte_credits <=
0 && delivery_credits <= 0 )
+
+ def disable = {
+ disabled = false
+ refiller.run()
+ }
def passing(value: T) = {
byte_credits -= sizer.size(value)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1233706&r1=1233705&r2=1233706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Jan 20 00:38:41 2012
@@ -415,8 +415,9 @@ class StompProtocolHandler extends Proto
override def dispose() = dispatchQueue {
ack_handler.close
- super.dispose()
+ credit_window_filter.disable
sink_manager.close(consumer_sink)
+ super.dispose()
}
def dispatch_queue = StompProtocolHandler.this.dispatchQueue
|