Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 50487D5E8 for ; Fri, 15 Feb 2013 15:42:34 +0000 (UTC) Received: (qmail 97140 invoked by uid 500); 15 Feb 2013 15:42:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 97054 invoked by uid 500); 15 Feb 2013 15:42:31 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 97041 invoked by uid 99); 15 Feb 2013 15:42:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Feb 2013 15:42:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Feb 2013 15:42:29 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 476BC2388AB9; Fri, 15 Feb 2013 15:42:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1446636 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-op... Date: Fri, 15 Feb 2013 15:42:09 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130215154210.476BC2388AB9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 15 15:42:09 2013 New Revision: 1446636 URL: http://svn.apache.org/r1446636 Log: When messages were being moved to the DLQ they would occasionally get into a bad state. We now handle the case were a swap out gets canceled much better. Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Fri Feb 15 15:42:09 2013 @@ -787,14 +787,11 @@ class AmqpProtocolHandler extends Protoc class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport { val key = addresses.toList - var is_connected = false override def send_buffer_size = buffer_size override def connection = Some(AmqpProtocolHandler.this.connection) - override def connected() = is_connected = true - override def dispatch_queue = queue val producer_overflow = new OverflowSink[Delivery](this) { Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Feb 15 15:42:09 2013 @@ -439,8 +439,9 @@ class Queue(val router: LocalRouter, val rc.consumer_count = cur.parked.size rc.is_prefetched = cur.prefetched rc.state = cur.label - if( cur.acquiring_subscription != null ) { - rc.acquirer = cur.acquiring_subscription.create_link_dto(false) + rc.acquirer = cur.acquiring_subscription match { + case sub:Subscription => sub.create_link_dto(false) + case _ => null } rc } @@ -700,9 +701,12 @@ class Queue(val router: LocalRouter, val queue_delivery.seq = entry.seq entry.init(queue_delivery) - if( tune_persistent ) { - queue_delivery.uow = delivery.uow + val uow = if( tune_persistent ) { + delivery.uow + } else { + null } + queue_delivery.uow = uow entries.addLast(entry) enqueue_item_counter += 1 @@ -713,10 +717,9 @@ class Queue(val router: LocalRouter, val enqueue_remaining_take(entry.size) // Do we need to do a persistent enqueue??? - val persisted = queue_delivery.uow != null - if (persisted) { + if (uow != null) { entry.state match { - case state:entry.Loaded => state.store + case state:entry.Loaded => state.store_enqueue case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord) } } @@ -730,7 +733,7 @@ class Queue(val router: LocalRouter, val if( entry.isLinked ) { if( !consumers_keeping_up_historically ) { entry.swap(true) - } else if( entry.as_loaded.is_acquired && persisted) { + } else if( entry.is_acquired && uow != null) { // If the message as dispatched and it's marked to get persisted anyways, // then it's ok if it falls out of memory since we won't need to load it again. entry.swap(false) @@ -742,8 +745,8 @@ class Queue(val router: LocalRouter, val } // release the store batch... - if (persisted) { - queue_delivery.uow.release + if (uow != null) { + uow.release queue_delivery.uow = null } @@ -1070,19 +1073,27 @@ class Queue(val router: LocalRouter, val override def connection = None override def dispatch_queue = Queue.this.dispatch_queue } + var dlq_route:DlqProducerRoute = _ - + var dlq_overflow:OverflowSink[(Delivery, (StoreUOW)=>Unit)] = _ + def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit) = { assert_executing if( config.dlq==null ) { - removeFunc(original_uow) + removeFunc(null) } else { - def complete(delivery:Delivery) = { - delivery.uow = original_uow - delivery.ack = (result, uow) => { - dispatch_queue { - removeFunc(uow) + def complete(original_delivery:Delivery) = { + assert_executing + val delivery = original_delivery.copy() + delivery.uow = if(delivery.storeKey == -1) { + null + } else { + if( original_uow == null ) { + create_uow + } else { + original_uow.retain() + original_uow } } delivery.expiration=0 @@ -1093,34 +1104,47 @@ class Queue(val router: LocalRouter, val router.virtual_host.dispatch_queue { val rc = router.connect(dlq_route.addresses, dlq_route, null) assert( rc == None ) // Not expecting this to ever fail. - dlq_route.dispatch_queue { - dlq_route.offer(delivery) + } + + dlq_overflow = new OverflowSink[(Delivery, (StoreUOW)=>Unit)](dlq_route.flatMap{ x => + Some(x._1) + }) { + override protected def onDelivered(value: (Delivery, (StoreUOW) => Unit)) { + val (delivery, callback) = value; + callback(delivery.uow) + if( delivery.uow!=null ) { + delivery.uow.release() + } } } - } else { - dlq_route.offer(delivery) } + dlq_overflow.offer((delivery, removeFunc)) } entry.state match { case x:entry.Loaded=> if( x.swapping_out ) { + x.acquirer = DeadLetterHandler x.on_swap_out ::=( ()=> { - complete(entry.state.asInstanceOf[entry.Swapped].to_delivery) + complete(entry.state match { + case state:entry.Swapped=> + state.to_delivery + case state:entry.Loaded => + state.delivery + case state => sys.error("Unexpected type: "+state) + }) }) } else { - complete(x.delivery.copy()) + complete(x.delivery) } case x:entry.Swapped=> complete(x.to_delivery) case _ => throw new Exception("Invalid queue entry state, it cannot be DQLed.") } - } } - - + def drain_acks = might_unfill { val end = System.nanoTime() ack_source.getData.foreach { @@ -1145,7 +1169,7 @@ class Queue(val router: LocalRouter, val var limit = dlq_nak_limit if( limit>0 && entry.entry.redelivery_count >= limit ) { dead_letter(uow, entry.entry) { uow => - entry.ack(uow) + entry.remove(uow) } } else { entry.nack Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Fri Feb 15 15:42:09 2013 @@ -298,7 +298,7 @@ class QueueEntry(val queue:Queue, val se * Is the entry acquired by a subscription. */ def is_acquired = acquiring_subscription!=null - def acquiring_subscription:Subscription = null + def acquiring_subscription:Acquirer = null /** * @returns true if the entry is either swapped or swapping. @@ -396,17 +396,17 @@ class QueueEntry(val queue:Queue, val se * The entry is in this state while a message is loaded in memory. A message must be in this state * before it can be dispatched to a subscription. */ - class Loaded(val delivery: Delivery, var stored:Boolean, var space:MemorySpace) extends EntryState { + class Loaded(val delivery: Delivery, var enqueue_stored:Boolean, var space:MemorySpace) extends EntryState { assert( delivery!=null, "delivery cannot be null") - var acquirer:Subscription = _ + var acquirer:Acquirer = _ override def acquiring_subscription = acquirer override def memory_space = space var swapping_out = false - var storing = false + var storing_enqueue = false queue.loaded_items += 1 queue.loaded_size += size @@ -422,7 +422,7 @@ class QueueEntry(val queue:Queue, val se rc } - override def toString = { "loaded:{ stored: "+stored+", swapping_out: "+swapping_out+", acquired: "+acquirer+", size:"+size+"}" } + override def toString = { "loaded:{ enqueue_stored: "+enqueue_stored+", swapping_out: "+swapping_out+", acquired: "+acquirer+", size:"+size+"}" } override def count = 1 override def size = delivery.size @@ -442,11 +442,11 @@ class QueueEntry(val queue:Queue, val se override def as_loaded = this - def store = { - // We should no longer be storing stuff if stopped. + def store_enqueue = { assert(queue.service_state.is_starting_or_started) - if(!stored && !storing) { - storing = true + if(!enqueue_stored && !storing_enqueue) { + storing_enqueue = true + assert( delivery.uow!=null ) delivery.uow.enqueue(toQueueEntryRecord) queue.swapping_out_size+=size delivery.uow.on_flush { canceled => @@ -463,37 +463,31 @@ class QueueEntry(val queue:Queue, val se override def swap_out(asap:Boolean) = { if( queue.tune_swap && !swapping_out ) { - swapping_out=true - - if( stored ) { - swapped_out(false) + if( enqueue_stored ) { + switch_to_swapped } else { - - // The storeBatch is only set when called from the messages.offer method + swapping_out=true if( delivery.uow!=null ) { + assert( delivery.storeKey != -1 ) if( asap ) { delivery.uow.complete_asap } } else { - // Are we swapping out a non-persistent message? - if( !storing ) { - assert( delivery.storeKey == -1 ) - + if( delivery.storeKey == -1 ) { val uow = queue.create_uow delivery.uow = uow delivery.storeLocator = new AtomicReference[Object]() delivery.storeKey = uow.store(delivery.createMessageRecord ) - store + store_enqueue if( asap ) { uow.complete_asap } - uow.release - delivery.uow = null - + uow.release() } else { + store_enqueue if( asap ) { - queue.virtual_host.store.flush_message(message_key) { + queue.virtual_host.store.flush_message(delivery.storeKey) { } } } @@ -512,31 +506,26 @@ class QueueEntry(val queue:Queue, val se } } - def swapped_out(store_wrote_to_disk:Boolean) = { + def swapped_out(not_canceled:Boolean) = { assert( state == this ) - storing = false - stored = true + assert( delivery.storeKey != -1 ) + storing_enqueue = false + + if( not_canceled ) { + enqueue_stored = true + } else { + delivery.storeKey = -1 + } + delivery.uow = null + if( swapping_out ) { swapping_out = false - space -= delivery - - if( store_wrote_to_disk ) { + if( not_canceled ) { queue.swap_out_size_counter += size queue.swap_out_item_counter += 1 + switch_to_swapped } - - state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender) - if( can_combine_with_prev ) { - getPrevious.as_swapped_range.combineNext - } - if( remove_pending ) { - state.remove - } else { - queue.loaded_items -= 1 - queue.loaded_size -= size - } - fire_swap_out_watchers } else { if( remove_pending ) { @@ -547,6 +536,20 @@ class QueueEntry(val queue:Queue, val se } } + def switch_to_swapped = { + space -= delivery + state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count, acquirer, sender) + if( remove_pending ) { + state.remove + } else { + if( can_combine_with_prev ) { + getPrevious.as_swapped_range.combineNext + } + queue.loaded_items -= 1 + queue.loaded_size -= size + } + } + override def swap_in(space:MemorySpace) = { if(space ne this.space) { this.space -= delivery @@ -562,7 +565,7 @@ class QueueEntry(val queue:Queue, val se queue.loaded_items -= 1 queue.loaded_size -= size } - if( storing | remove_pending ) { + if( storing_enqueue | remove_pending ) { remove_pending = true } else { delivery.message.release @@ -720,7 +723,9 @@ class QueueEntry(val queue:Queue, val se * entry is persisted, it can move into this state. This state only holds onto the * the massage key so that it can reload the message from the store quickly when needed. */ - class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription, override val sender:List[DestinationAddress]) extends EntryState { + class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Acquirer, override val sender:List[DestinationAddress]) extends EntryState { + + assert( message_key!= -1 ) queue.individual_swapped_items += 1 Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Feb 15 15:42:09 2013 @@ -216,8 +216,18 @@ abstract class DeliveryProducerRoute(rou } else { null } + var is_connected = false def connected() = defer { + is_connected = true + if( overflow!=null ) { + val t = overflow + overflow = null + _offer(t) + if( refiller!=null && !full ) { + refiller.run() + } + } on_connected } @@ -258,6 +268,7 @@ abstract class DeliveryProducerRoute(rou debug("producer route detaching from consumer.") x.close } + is_connected = false } protected def on_connected = {} @@ -276,76 +287,89 @@ abstract class DeliveryProducerRoute(rou def full = overflow!=null - def offer(delivery:Delivery) = { + def offer(delivery:Delivery):Boolean = { dispatch_queue.assertExecuting() if( full ) { false } else { - last_send = Broker.now + if (delivery.uow != null) { + delivery.uow.retain() + } + if ( !is_connected ) { + overflow = delivery + } else { + _offer(delivery) + } + return true + } + } - // Do we need to store the message if we have a matching consumer? - var matching_targets = 0 - val original_ack = delivery.ack - val copy = delivery.copy - - if ( original_ack!=null ) { - copy.ack = (result, uow)=> { - defer { - matching_targets -= 1 - if ( matching_targets<= 0 && copy.ack!=null ) { - copy.ack = null - if (delivery.uow != null) { - delivery.uow.on_complete { - defer { - original_ack(Consumed, null) - } + private def _offer(delivery:Delivery):Boolean = { + last_send = Broker.now + + // Do we need to store the message if we have a matching consumer? + var matching_targets = 0 + val original_ack = delivery.ack + val copy = delivery.copy + copy.uow = delivery.uow + + if ( original_ack!=null ) { + copy.ack = (result, uow)=> { + defer { + matching_targets -= 1 + if ( matching_targets<= 0 && copy.ack!=null ) { + copy.ack = null + if (delivery.uow != null) { + delivery.uow.on_complete { + defer { + original_ack(Consumed, null) } - } else { - original_ack(Consumed, null) } + } else { + original_ack(Consumed, null) } } } } + } - if(copy.message!=null) { - copy.message.retain - } - - targets.foreach { target=> + if(copy.message!=null) { + copy.message.retain + } - // only deliver to matching consumers - if( target.consumer.matches(copy) ) { - matching_targets += 1 - if ( target.consumer.is_persistent && copy.persistent && store != null) { + targets.foreach { target=> - if (copy.uow == null) { - copy.uow = store.create_uow - } + // only deliver to matching consumers + if( target.consumer.matches(copy) ) { + matching_targets += 1 + if ( target.consumer.is_persistent && copy.persistent && store != null) { - if( copy.storeKey == -1L ) { - copy.storeLocator = new AtomicReference[Object]() - copy.storeKey = copy.uow.store(copy.createMessageRecord) - } + if (copy.uow == null) { + copy.uow = store.create_uow } - if( !target.offer(copy) ) { - overflowSessions ::= target + if( copy.storeKey == -1L ) { + copy.storeLocator = new AtomicReference[Object]() + copy.storeKey = copy.uow.store(copy.createMessageRecord) } } - } - if ( matching_targets == 0 && original_ack!=null ) { - original_ack(Consumed, null) + if( !target.offer(copy) ) { + overflowSessions ::= target + } } + } - if( overflowSessions!=Nil ) { - overflow = copy - } else { - release(copy) - } - true + if ( matching_targets == 0 && original_ack!=null ) { + original_ack(Consumed, null) } + + if( overflowSessions!=Nil ) { + overflow = copy + } else { + release(copy) + } + true } @@ -359,19 +383,23 @@ abstract class DeliveryProducerRoute(rou } val drainer = ^{ - if( overflow!=null ) { - val original = overflowSessions; - overflowSessions = Nil - original.foreach { target=> - if( !target.offer(overflow) ) { - overflowSessions ::= target + if( is_connected ) { + if( overflow!=null ) { + val original = overflowSessions; + overflowSessions = Nil + original.foreach { target=> + if( !target.offer(overflow) ) { + overflowSessions ::= target + } } - } - if( overflowSessions==Nil ) { - release(overflow) - overflow = null - if(refiller!=null) - refiller.run + if( overflowSessions==Nil ) { + release(overflow) + overflow = null + if(refiller!=null) + refiller.run + } + } else if(refiller!=null) { + refiller.run } } } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Fri Feb 15 15:42:09 2013 @@ -23,6 +23,9 @@ import org.apache.activemq.apollo.util._ import org.apache.activemq.apollo.util.list._ import org.apache.activemq.apollo.dto.QueueConsumerLinkDTO +trait Acquirer +object DeadLetterHandler extends Acquirer + /** * @author Hiram Chirino */ @@ -35,7 +38,7 @@ object Subscription extends Log * * @author Hiram Chirino */ -class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with Dispatched with StallCheckSupport { +class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends Acquirer with DeliveryProducer with Dispatched with StallCheckSupport { import Subscription._ def dispatch_queue = queue.dispatch_queue @@ -366,14 +369,18 @@ class Subscription(val queue:Queue, val def ack(uow:StoreUOW):Unit = { assert_executing if(!isLinked) { - debug("Unexpected ack: message seq allready acked: "+entry.seq) + debug("Unexpected ack: message seq already acked: "+entry.seq) return } - val next = entry.getNext - total_ack_count += 1 total_ack_size += entry.size + remove(uow) + } + + def remove(uow:StoreUOW):Unit = { + assert_executing + val next = entry.getNext entry.dequeue(uow) // removes this entry from the acquired list. Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Fri Feb 15 15:42:09 2013 @@ -863,9 +863,16 @@ case class MqttSession(host_state:HostSt override def send_buffer_size = handler.codec.getReadBufferSize override def connection = Some(handler.connection) override def dispatch_queue = queue - refiller = ^{ - handler.resume_read + + var suspended = false + + refiller = ^ { + if( suspended ) { + suspended = false + handler.resume_read + } } + } def on_mqtt_publish(publish:PUBLISH):Unit = { @@ -907,7 +914,7 @@ case class MqttSession(host_state:HostSt } } - def send_via_route(route:DeliveryProducerRoute, publish:PUBLISH):Unit = { + def send_via_route(route:MqttProducerRoute, publish:PUBLISH):Unit = { queue.assertExecuting() def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue { @@ -952,6 +959,7 @@ case class MqttSession(host_state:HostSt route.offer(delivery) if( route.full ) { // but once it gets full.. suspend to flow control the producer. + route.suspended = true handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", ")) } Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Feb 15 15:42:09 2013 @@ -632,8 +632,13 @@ class OpenwireProtocolHandler extends Pr override def connection = Some(OpenwireProtocolHandler.this.connection) override def dispatch_queue = queue + var suspended = false + refiller = ^ { - resume_read + if( suspended ) { + suspended = false + resume_read + } } } @@ -645,6 +650,9 @@ class OpenwireProtocolHandler extends Pr val addresses = to_destination_dto(msg.getDestination, this) val route = OpenwireDeliveryProducerRoute(addresses) + if( uow!=null ) { + uow.retain() + } // don't process frames until producer is connected... suspend_read("connecting producer route") host.dispatch_queue { @@ -660,6 +668,9 @@ class OpenwireProtocolHandler extends Pr send_via_route(route, msg, uow) } } + if( uow!=null ) { + uow.release() + } } } @@ -670,7 +681,7 @@ class OpenwireProtocolHandler extends Pr } } - def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = { + def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = { if( !route.targets.isEmpty ) { // We may need to add some headers.. @@ -701,6 +712,7 @@ class OpenwireProtocolHandler extends Pr if( route.full ) { // but once it gets full.. suspend, so that we get more messages // until it's not full anymore. + route.suspended = true suspend_read("blocked destination: "+route.overflowSessions.mkString(", ")) } Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Feb 15 15:42:09 2013 @@ -1158,10 +1158,13 @@ class StompProtocolHandler extends Proto override def connection = Some(StompProtocolHandler.this.connection) override def dispatch_queue = queue - + var suspended = false refiller = ^ { - resume_read + if( suspended ) { + resume_read + suspended = false + } } @@ -1239,7 +1242,10 @@ class StompProtocolHandler extends Proto val trimmed_dest = dest.deepCopy().ascii() // create the producer route... val route = new StompProducerRoute(trimmed_dest) // don't process frames until producer is connected... - connection.transport.suspendRead + suspend_read("Connecting to destination") + if( uow !=null ) { + uow.retain() + } host.dispatch_queue { val rc = host.router.connect(route.addresses, route, security_context) dispatchQueue { @@ -1254,6 +1260,9 @@ class StompProtocolHandler extends Proto send_via_route(route.addresses, route, frame, uow) } } + if( uow !=null ) { + uow.release() + } } } @@ -1357,7 +1366,7 @@ class StompProtocolHandler extends Proto rc } - def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = { + def send_via_route(addresses: Array[SimpleAddress], route:StompProducerRoute, frame:StompFrame, uow:StoreUOW) = { var storeBatch:StoreUOW=null // User might be asking for ack that we have processed the message.. @@ -1402,6 +1411,7 @@ class StompProtocolHandler extends Proto if( route.full ) { // but once it gets full.. suspend, so that we get more stomp messages // until it's not full anymore. + route.suspended = true suspend_read("blocked sending to: "+route.overflowSessions.mkString(", ")) } Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala?rev=1446636&r1=1446635&r2=1446636&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/DeadLetterQueueLoadTest.scala Fri Feb 15 15:42:09 2013 @@ -26,14 +26,14 @@ class DeadLetterQueueLoadTest extends St override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml" for (i <- 1 to 16 ) - test("naker."+i) { + test("naker.load."+i) { connect("1.1") val dlq_client = connect("1.1", new StompClient) - subscribe("0", "/queue/nacker."+i, "client", false, "", false) - subscribe("dlq", "/queue/dlq.nacker."+i, "auto", false, "", false, c=dlq_client) + subscribe("0", "/queue/nacker.load."+i, "client", false, "", false) + subscribe("dlq", "/queue/dlq.nacker.load."+i, "client", false, "", false, c=dlq_client) for( j <- 1 to 1000 ) { - async_send("/queue/nacker."+i, j) + async_send("/queue/nacker.load."+i, j) assert_received(j, "0")(false) assert_received(j, "0")(false) // It should be sent to the DLQ after the 2nd nak