activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1161668 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/...
Date Thu, 25 Aug 2011 17:51:02 GMT
Author: chirino
Date: Thu Aug 25 17:51:02 2011
New Revision: 1161668

URL: http://svn.apache.org/viewvc?rev=1161668&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-75 : Apollo does not set the redelivered
header on redelivered messages

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Thu Aug 25 17:51:02 2011
@@ -140,19 +140,34 @@ object Delivery extends Sizer[Delivery] 
 }
 
 sealed trait DeliveryResult
-/** message was processed, does not need redelivery */
+
+/**
+ * message was delivered and processed, does not need redelivery
+ */
+object Consumed extends DeliveryResult
+
+/**
+ * The message was delivered but not consumed, it should be redelivered to another consumer
ASAP.
+ * The redelivery counter should increment.
+ */
 object Delivered extends DeliveryResult
-/** message expired before it could be processed, does not need redelivery */
+
+/**
+ * The message was not delivered so it should be redelivered to another consumer but not
effect
+ * it's redelivery counter.
+ */
+object Undelivered extends DeliveryResult
+
+/**
+ * message expired before it could be processed, does not need redelivery
+ */
 object Expired extends DeliveryResult
+
 /**
-  * The receiver thinks the message was poison message, it was not successfully
-  * processed and it should not get redelivered..
-  */
+ * The receiver thinks the message was poison message, it was not successfully
+ * processed and it should not get redelivered..
+ */
 object Poisoned extends DeliveryResult
-/**
-  * The message was not consumed, it should be redelivered to another consumer ASAP.
-  */
-object Undelivered extends DeliveryResult
 
 class Delivery {
 
@@ -183,6 +198,11 @@ class Delivery {
   var uow:StoreUOW = null
 
   /**
+   * The number of redeliveries that this message has seen.
+   */
+  var redeliveries:Short = 0
+
+  /**
    * Set if the producer requires an ack to be sent back.  Consumer
    * should execute once the message is processed.
    */
@@ -195,6 +215,7 @@ class Delivery {
     message = other.message
     storeKey = other.storeKey
     storeLocator = other.storeLocator
+    redeliveries = other.redeliveries
     this
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Aug 25 17:51:02 2011
@@ -30,6 +30,7 @@ import security.SecurityContext
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
 import org.fusesource.hawtbuf.Buffer
+import java.lang.UnsupportedOperationException
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -575,13 +576,19 @@ class Queue(val router: LocalRouter, val
     ack_source.getData.foreach {
       case (entry, consumed, uow) =>
         consumed match {
-          case Delivered   =>
+          case Consumed =>
             entry.ack(uow)
-          case Expired     =>
+          case Expired=>
             entry.entry.queue.expired(entry.entry, false)
             entry.ack(uow)
-          case Poisoned    => entry.nack
-          case Undelivered => entry.nack
+          case Delivered =>
+            entry.entry.redelivered
+            entry.nack
+          case Poisoned    =>
+            entry.entry.redelivered
+            entry.nack
+          case Undelivered =>
+            entry.nack
         }
         if( uow!=null ) {
           uow.release()
@@ -831,7 +838,7 @@ class QueueEntry(val queue:Queue, val se
 
   def init(qer:QueueEntryRecord):QueueEntry = {
     val locator = new AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
-    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration)
+    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration, qer.redeliveries)
     this
   }
 
@@ -927,6 +934,8 @@ class QueueEntry(val queue:Queue, val se
   def count = state.count
   def size = state.size
   def expiration = state.expiration
+  def redeliveries = state.redeliveries
+  def redelivered = state.redelivered
   def messageKey = state.message_key
   def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
   def dispatch() = state.dispatch
@@ -966,6 +975,16 @@ class QueueEntry(val queue:Queue, val se
     def expiration = 0L
 
     /**
+     * When the entry expires or 0 if it does not expire.
+     */
+    def redeliveries:Short = throw new UnsupportedOperationException()
+
+    /**
+     * Called to increment the redelivery counter
+     */
+    def redelivered:Unit = {}
+
+    /**
      * Gets number of messages that this entry represents
      */
     def count = 0
@@ -1109,6 +1128,9 @@ class QueueEntry(val queue:Queue, val se
     override def expiration = delivery.message.expiration
     override def message_key = delivery.storeKey
     override def message_locator = delivery.storeLocator
+    override def redeliveries = delivery.redeliveries
+
+    override def redelivered = delivery.redeliveries = ((delivery.redeliveries+1).min(Short.MaxValue)).toShort
 
     var remove_pending = false
 
@@ -1184,7 +1206,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
 
-        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration)
+        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redeliveries)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
@@ -1332,12 +1354,14 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Array[Byte]],
override val size:Int, override val expiration:Long) extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Array[Byte]],
override val size:Int, override val expiration:Long, var _redeliveries:Short) extends EntryState
{
 
     queue.individual_swapped_items += 1
 
     var swapping_in = false
 
+    override def redeliveries = _redeliveries
+    override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
 
     override def count = 1
 
@@ -1390,6 +1414,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
         delivery.storeLocator = messageRecord.locator
+        delivery.redeliveries = redeliveries
 
         queue.swapped_in_size += delivery.size
         queue.swapped_in_items += 1
@@ -1629,6 +1654,7 @@ class Subscription(val queue:Queue, val 
       while( next !=null ) {
         val cur = next;
         next = next.getNext
+        cur.entry.redelivered
         cur.nack // this unlinks the entry.
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Thu Aug 25 17:51:02 2011
@@ -186,11 +186,11 @@ abstract class DeliveryProducerRoute(rou
       if (delivery.uow != null) {
         val ack = pendingAck
         delivery.uow.on_complete {
-          ack(Delivered, null)
+          ack(Consumed, null)
         }
 
       } else {
-        pendingAck(Delivered, null)
+        pendingAck(Consumed, null)
       }
       pendingAck==null
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Thu Aug 25 17:51:02 2011
@@ -983,12 +983,12 @@ class OpenwireProtocolHandler extends Pr
 
         val msgid = messageAck.getLastMessageId
         val consumed = messageAck.getAckType match {
-          case MessageAck.DELIVERED_ACK_TYPE => Delivered
-          case MessageAck.INDIVIDUAL_ACK_TYPE => Delivered
-          case MessageAck.STANDARD_ACK_TYPE => Delivered
+          case MessageAck.DELIVERED_ACK_TYPE => Consumed
+          case MessageAck.INDIVIDUAL_ACK_TYPE => Consumed
+          case MessageAck.STANDARD_ACK_TYPE => Consumed
           case MessageAck.POSION_ACK_TYPE => Poisoned
-          case MessageAck.REDELIVERED_ACK_TYPE => Undelivered
-          case MessageAck.UNMATCHED_ACK_TYPE => Delivered
+          case MessageAck.REDELIVERED_ACK_TYPE => Delivered
+          case MessageAck.UNMATCHED_ACK_TYPE => Consumed
         }
 
         if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Aug 25 17:51:02 2011
@@ -39,6 +39,7 @@ import org.apache.activemq.apollo.transp
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
+import collection.immutable.List._
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -190,7 +191,7 @@ class StompProtocolHandler extends Proto
 
       def track(delivery:Delivery) = {
         if( delivery.ack!=null ) {
-          delivery.ack(Delivered, null)
+          delivery.ack(Consumed, null)
         }
         ack_source.merge((delivery.size, 1))
       }
@@ -336,6 +337,11 @@ class StompProtocolHandler extends Proto
       if( subscription_id != None ) {
         frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
       }
+      if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
+        val header = encode_header(config.add_redeliveries_header)
+        val value = ascii(delivery.redeliveries.toString())
+        frame = frame.append_headers((header, value)::Nil)
+      }
       frame
     }, Delivery)
 
@@ -774,7 +780,7 @@ class StompProtocolHandler extends Proto
 
       connected_headers += SERVER->encode_header("apache-apollo/"+Broker.version)
 
-      session_id = encode_header("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
+      session_id = encode_header("%s-%x-".format(this.host.config.id, this.host.session_counter.incrementAndGet))
       connected_headers += SESSION->session_id
 
       val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
@@ -1160,11 +1166,11 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_ack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, Delivered)
+    on_stomp_ack(frame.headers, Consumed)
   }
 
   def on_stomp_nack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, Undelivered)
+    on_stomp_ack(frame.headers, Delivered)
   }
 
   def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
Thu Aug 25 17:51:02 2011
@@ -43,11 +43,19 @@ public class StompDTO extends ProtocolDT
 
     /**
      * If set, it will add the configured header name with the value
-     * set the a timestamp of when the message is recieved.
+     * set the a timestamp of when the message is received.
      */
     @XmlAttribute(name="add_timestamp_header")
     public String add_timestamp_header;
 
+    /**
+     * If set, the configured header will be added to message
+     * sent to consumer if the message is a redelivery.  It will be
+     * set to the number of re-deliveries that have occurred.
+     */
+    @XmlAttribute(name="add_redeliveries_header")
+    public String add_redeliveries_header;
+
     @XmlAttribute(name="max_header_length")
     public Integer max_header_length;
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Thu Aug
25 17:51:02 2011
@@ -973,6 +973,10 @@ following configuration attributes:
   message received.  The value of the header will be set to the time the message
   was received.  The time will be represented as the number of milliseconds elapsed
   since the UNIX epoch in GMT.  Not set by default.
+* `add_redeliveries_header` :  Name of the header which will be added to messages
+  sent to consumers if the messages has been redelivered.  The value of the header 
+  will be set to the number of times the message has been redeliverd.  Not set 
+  by default.
 * `max_header_length` : The maximum allowed length of a STOMP header. Defaults 
   to 10240 (10k).
 * `max_headers` : The maximum number of allowed headers in a frame.  Defaults 



Mime
View raw message