activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1308214 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apache/...
Date Sun, 01 Apr 2012 23:03:13 GMT
Author: chirino
Date: Sun Apr  1 23:03:13 2012
New Revision: 1308214

URL: http://svn.apache.org/viewvc?rev=1308214&view=rev
Log:
Fixes APLO-16 : Add support for Dead Letter Queues

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sun Apr  1 23:03:13 2012
@@ -31,7 +31,8 @@ import java.lang.UnsupportedOperationExc
 import security.SecuredResource._
 import security.{SecuredResource, SecurityContext}
 import org.apache.activemq.apollo.dto._
-import org.fusesource.hawtbuf.UTF8Buffer
+import org.fusesource.hawtbuf._
+import java.util.regex.Pattern
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -214,6 +215,8 @@ class Queue(val router: LocalRouter, val
 
   var config:QueueDTO = _
 
+  def dlq_nak_limit = OptionSupport(config.nak_limit).getOrElse(0)
+
   def configure(update:QueueDTO) = {
     def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
 
@@ -462,6 +465,14 @@ class Queue(val router: LocalRouter, val
       sub.close()
     }
 
+    if( dql_route!=null ) {
+      val route = dql_route
+      dql_route = null
+      virtual_host.dispatch_queue {
+        router.disconnect(route.addresses, route)
+      }
+    }
+
     trigger_swap
 
     stop_listener_waiting_for_flush = on_completed
@@ -551,7 +562,10 @@ class Queue(val router: LocalRouter, val
         // Do we need to do a persistent enqueue???
         val persisted = queue_delivery.uow != null
         if (persisted) {
-          entry.as_loaded.store
+          entry.state match {
+            case state:entry.Loaded => state.store
+            case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
+          }
         }
 
         if( entry.hasSubs ) {
@@ -855,6 +869,60 @@ class Queue(val router: LocalRouter, val
     }
   }
 
+  class DlqProducerRoute(val addresses:Array[ConnectAddress]) extends DeliveryProducerRoute(router)
{
+    override def connection = None
+    override def dispatch_queue = Queue.this.dispatch_queue
+  }
+  var dql_route:DlqProducerRoute = _
+  
+  def dead_letter(original_uow:StoreUOW, entry:QueueEntry)(removeFunc: (StoreUOW)=>Unit)
= {
+
+    if( config.dlq==null ) {
+      removeFunc(original_uow)
+    } else {
+      val delivery:Delivery = entry.state match {
+        case x:entry.Loaded=>
+          x.delivery.copy()
+        case x:entry.Swapped=>
+          x.to_delivery
+        case _ =>
+          throw new Exception("Invalid queue entry state, it cannot be DQLed.")
+      }
+
+      delivery.uow = original_uow
+
+//      delivery.uow = if( tune_persistent ) {
+//        if(original_uow!=null ) {
+//          original_uow
+//        } else {
+//          virtual_host.store.create_uow()
+//        }
+//      } else {
+//        null
+//      }
+
+      delivery.ack = (result, uow) => {
+        removeFunc(uow)
+      }
+
+      if( dql_route==null ) {
+        val dlq = config.dlq.replaceAll(Pattern.quote("*"), id)
+        dql_route = new DlqProducerRoute(Array(SimpleAddress("queue:"+dlq)))
+        router.virtual_host.dispatch_queue {
+          val rc = router.connect(dql_route.addresses, dql_route, null)
+          assert( rc == None ) // Not expecting this to ever fail.
+          dql_route.dispatch_queue {
+            dql_route.offer(delivery)
+          }
+        }
+      } else {
+        dql_route.offer(delivery)
+      }
+
+    }
+  }
+  
+  
   def drain_acks = might_unfill {
     ack_source.getData.foreach {
       case (entry, consumed, uow) =>
@@ -867,14 +935,24 @@ class Queue(val router: LocalRouter, val
             entry.entry.queue.expired(entry.entry, false)
             entry.ack(uow)
           case Delivered =>
+            entry.increment_nack
             entry.entry.redelivered
             entry.nack
           case Poisoned =>
-            // TODO: send to DLQ once that is supported.
+            entry.increment_nack
             entry.entry.redelivered
-            entry.nack
+            var limit = dlq_nak_limit
+            if( limit>0 && entry.entry.redelivery_count >= limit ) {
+              dead_letter(uow, entry.entry) { uow =>
+                dispatch_queue {
+                  entry.ack(uow)
+                }
+              }
+            } else {
+              entry.nack
+            }
           case Undelivered =>
-            entry.nack
+            entry.increment_nack
         }
         if( uow!=null ) {
           uow.release()
@@ -924,7 +1002,9 @@ class Queue(val router: LocalRouter, val
       if (downstream.full) {
         false
       } else {
-        delivery.message.retain
+        if( delivery.message!=null ) {
+          delivery.message.retain
+        }
         if( tune_persistent && delivery.uow!=null ) {
           delivery.uow.retain
         }
@@ -1102,8 +1182,16 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(delivery:Delivery):QueueEntry = {
-    queue.producer_swapped_in += delivery
-    state = new Loaded(delivery, false, queue.producer_swapped_in)
+    if( delivery.message == null ) {
+      // This must be a swapped out message which has been previously persisted in
+      // another queue.  We need to enqueue it to this queue..
+      queue.swap_out_size_counter += delivery.size
+      queue.swap_out_item_counter += 1
+      state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration,
0, null, delivery.sender)
+    } else {
+      queue.producer_swapped_in += delivery
+      state = new Loaded(delivery, false, queue.producer_swapped_in)
+    }
     this
   }
 
@@ -2260,6 +2348,8 @@ class Subscription(val queue:Queue, val 
       
     }
 
+    def increment_nack = total_nack_count += 1
+
     def nack:Unit = {
       assert_executing
       if(!isLinked) {
@@ -2267,7 +2357,6 @@ class Subscription(val queue:Queue, val 
         return
       }
 
-      total_nack_count += 1
       entry.state match {
         case x:entry.Loaded=> x.acquirer = null
         case x:entry.Swapped=> x.acquirer = null

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Sun Apr  1 23:03:13 2012
@@ -287,8 +287,11 @@ abstract class DeliveryProducerRoute(rou
       // Do we need to store the message if we have a matching consumer?
       pendingAck = delivery.ack
       val copy = delivery.copy
-      copy.message.retain
-
+      
+      if(copy.message!=null) {
+        copy.message.retain
+      }
+      
       targets.foreach { target=>
 
         // only deliver to matching consumers
@@ -340,7 +343,9 @@ abstract class DeliveryProducerRoute(rou
     if (delivery.uow != null) {
       delivery.uow.release
     }
-    delivery.message.release
+    if( delivery.message!=null ) {
+      delivery.message.release
+    }
   }
 
   val drainer = ^{
@@ -355,7 +360,8 @@ abstract class DeliveryProducerRoute(rou
       if( overflowSessions==Nil ) {
         delivered(overflow)
         overflow = null
-        refiller.run
+        if(refiller!=null)
+          refiller.run
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
Sun Apr  1 23:03:13 2012
@@ -124,6 +124,27 @@ public class QueueDTO extends StringIdDT
     @XmlAnyElement(lax=true)
     public List<Object> other = new ArrayList<Object>();
 
+    /**
+     * Is the dead letter queue configured for the destination.  A
+     * dead letter queue is used for storing messages that failed to get processed
+     * by consumers.  If not set, then messages that fail to get processed
+     * will be dropped.  If '*' appears in the name it will be replaced with
+     * the queue's id.
+     */
+    @XmlAttribute(name="dlq")
+    public String dlq;
+
+    /**
+     * Once a message has been nacked the configured
+     * number of times the message will be considered to be a
+     * poison message and will get moved to the dead letter queue if that's
+     * configured or dropped.  If set to less than one, then the message
+     * will never be considered to be a poison message.
+     * Defaults to zero.
+     */
+    @XmlAttribute(name="nak_limit")
+    public Integer nak_limit;
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -147,6 +168,8 @@ public class QueueDTO extends StringIdDT
         if (swap_range_size != null ? !swap_range_size.equals(queueDTO.swap_range_size) :
queueDTO.swap_range_size != null)
             return false;
         if (mirrored != null ? !mirrored.equals(queueDTO.mirrored) : queueDTO.mirrored !=
null) return false;
+        if (dlq != null ? !dlq.equals(queueDTO.dlq) : queueDTO.dlq != null) return false;
+        if (nak_limit != null ? !nak_limit.equals(queueDTO.nak_limit) : queueDTO.nak_limit
!= null) return false;
 
         return true;
     }
@@ -164,6 +187,8 @@ public class QueueDTO extends StringIdDT
         result = 31 * result + (catchup_enqueue_rate != null ? catchup_enqueue_rate.hashCode()
: 0);
         result = 31 * result + (max_enqueue_rate != null ? max_enqueue_rate.hashCode() :
0);
         result = 31 * result + (other != null ? other.hashCode() : 0);
+        result = 31 * result + (dlq != null ? dlq.hashCode() : 0);
+        result = 31 * result + (nak_limit != null ? nak_limit.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Sun
Apr  1 23:03:13 2012
@@ -21,6 +21,7 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
 
     <bdb_store directory="${testdatadir}"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
Sun Apr  1 23:03:13 2012
@@ -21,6 +21,7 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
 
     <leveldb_store directory="${testdatadir}"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Sun Apr
 1 23:03:13 2012
@@ -21,6 +21,7 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
     <topic id="queued.**" slow_consumer_policy="queue"/>
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Sun Apr  1 23:03:13 2012
@@ -49,6 +49,7 @@ class StompClient extends ShouldMatchers
     }
     socket.connect(new InetSocketAddress(host, port))
     socket.setSoLinger(true, 1)
+    socket.setSoTimeout(30*1000)
     out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
     in = new BufferedInputStream(socket.getInputStream, bufferSize)
   }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Sun Apr  1 23:03:13 2012
@@ -95,7 +95,7 @@ class StompTestSupport extends BrokerFun
       body)
   }
 
-  def subscribe(id:String, dest:String, mode:String="auto", persistent:Boolean=false, headers:String="",
c: StompClient = client) = {
+  def subscribe(id:String, dest:String, mode:String="auto", persistent:Boolean=false, headers:String="",
sync:Boolean=true, c: StompClient = client) = {
     val rid = receipt_counter.incrementAndGet()
     c.write(
       "SUBSCRIBE\n" +
@@ -103,10 +103,12 @@ class StompTestSupport extends BrokerFun
       "id:"+id+"\n" +
       (if(persistent) "persistent:true\n" else "")+
       "ack:"+mode+"\n"+
-      "receipt:"+rid+"\n" +
+      (if(sync) "receipt:"+rid+"\n" else "") +
       headers+
       "\n")
-    wait_for_receipt(""+rid, c)
+    if(sync) {
+      wait_for_receipt(""+rid, c)
+    }
   }
 
   def unsubscribe(id:String, headers:String="", c: StompClient=client) = {
@@ -120,7 +122,7 @@ class StompTestSupport extends BrokerFun
     wait_for_receipt(""+rid, c)
   }
 
-  def assert_received(body:Any, sub:String=null, c: StompClient=client)={
+  def assert_received(body:Any, sub:String=null, c: StompClient=client):(Boolean)=>Unit
= {
     val frame = c.receive()
     frame should startWith("MESSAGE\n")
     if(sub!=null) {
@@ -132,13 +134,13 @@ class StompTestSupport extends BrokerFun
       case body => frame should endWith("\n\n"+body)
     }
     // return a func that can ack the message.
-    ()=> {
+    (ack:Boolean)=> {
       val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
       val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
       val sub_regex(sub) = frame
       val msgid_regex(msgid) = frame
       c.write(
-        "ACK\n" +
+        (if(ack) "ACK\n" else "NACK\n") +
         "subscription:"+sub+"\n" +
         "message-id:"+msgid+"\n" +
         "\n")
@@ -299,7 +301,7 @@ class StompMetricsTest extends StompTest
     stat2.metrics.queue_items should be(2)
 
     // Ack now..
-    ack2() ; ack3()
+    ack2(true) ; ack3(true)
 
     within(1, SECONDS) {
       val stat3 = topic_status("queued.stats")
@@ -352,7 +354,7 @@ class StompMetricsTest extends StompTest
     stat2.metrics.queue_items should be(2)
 
     // Ack SOME now..
-    ack2();
+    ack2(true);
 
     within(1, SECONDS) {
       val stat3 = topic_status("dsubed.stats")
@@ -1481,32 +1483,9 @@ class StompMirroredQueueTest extends Sto
 
   test("Topic gets copy of message sent to queue") {
     connect("1.1")
-
-    // Connect to subscribers
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/mirrored.a\n" +
-      "id:1\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    def put(id:Int) = {
-      client.write(
-        "SEND\n" +
-        "destination:/queue/mirrored.a\n" +
-        "\n" +
-        "message:"+id+"\n")
-    }
-
-    put(1)
-
-    def get(id:Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith regex("\n\nmessage:"+id+"\n")
-    }
-    get(1)
+    subscribe("1", "/topic/mirrored.a")
+    async_send("/queue/mirrored.a", "message:1\n")
+    assert_received("message:1\n")
   }
 
   test("Queue gets copy of message sent to topic") {
@@ -2397,4 +2376,41 @@ class StompUdpInteropTest extends StompT
 
     assert_received("Hello")
   }
+}
+
+class StompNackTest extends StompTestSupport {
+
+  test("NACKing moves messages to DLQ (non-persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.a", "this msg is not persistent")
+
+    subscribe("0", "/queue/nacker.a", "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.a", "auto", false, "", false)
+    var ack = assert_received("this msg is not persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is not persistent", "0")
+    ack(false)
+
+    // It should be sent to the DLQ after the 2nd nak
+    assert_received("this msg is not persistent", "dlq")
+  }
+
+  test("NACKing moves messages to DLQ (persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
+
+    subscribe("0", "/queue/nacker.b", "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.b", "auto", false, "", false)
+    var ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is persistent", "0")
+    ack(false)
+
+    // It should be sent to the DLQ after the 2nd nak
+    assert_received("this msg is persistent", "dlq")
+  }
+}
+
+class StompNackTestOnLevelDBTest extends StompNackTest {
+  override val broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1308214&r1=1308213&r2=1308214&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sun Apr
 1 23:03:13 2012
@@ -362,6 +362,15 @@ memory.  Defaults to true.
   will be flow controlled once this enqueue rate is reached.  If not set
   then it is disabled
 
+* `dlq`: Is the dead letter queue configured for the destination.  A 
+   dead letter queue is used for storing messages that failed to get processed
+   by consumers.  If not set, then messages that fail to get processed
+   will be dropped.  If '*' appears in the name it will be replaced with 
+   the queue's id.
+
+* `max_enqueue_rate`: The maximum enqueue rate of the queue.  Producers
+  will be flow controlled once this enqueue rate is reached.  If not set
+  then it is disabled
 
 ##### Topics
 



Mime
View raw message