activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1240841 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main: proto/ scala/org/apache/activemq/apollo/broker/ scala/org/apache/activemq/apollo/broker/store/
Date Sun, 05 Feb 2012 21:54:23 GMT
Author: chirino
Date: Sun Feb  5 21:54:22 2012
New Revision: 1240841

URL: http://svn.apache.org/viewvc?rev=1240841&view=rev
Log:
Store the original sender in the queue entry if it's not the current queue.  Allows consumers
on durable subs to find out which topic the message originated from.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1240841&r1=1240840&r2=1240841&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Sun Feb  5 21:54:22
2012
@@ -50,6 +50,7 @@ message QueueEntryPB {
   optional int32 redeliveries = 6;
   optional sint64 expiration=7;
   optional bytes messageLocator=8;
+  optional bytes sender=9;
 }
 
 message MapEntryPB {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1240841&r1=1240840&r2=1240841&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sun Feb  5 21:54:22 2012
@@ -31,6 +31,7 @@ import java.lang.UnsupportedOperationExc
 import security.SecuredResource._
 import security.{SecuredResource, SecurityContext}
 import org.apache.activemq.apollo.dto._
+import org.fusesource.hawtbuf.UTF8Buffer
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -556,7 +557,6 @@ class Queue(val router: LocalRouter, val
         val entry = tail_entry
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queue_delivery = delivery.copy
-        queue_delivery.sender = address
         queue_delivery.seq = entry.seq
         entry.init(queue_delivery)
         
@@ -1131,7 +1131,12 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries,
null)
+    val sender = if ( qer.sender==null ) {
+      null
+    } else {
+      SimpleAddress(qer.sender.utf8().toString)
+    }
+    state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries,
null, sender)
     this
   }
 
@@ -1191,8 +1196,12 @@ class QueueEntry(val queue:Queue, val se
     qer.entry_seq = seq
     qer.message_key = state.message_key
     qer.message_locator = state.message_locator
+    qer.message_locator = state.message_locator
     qer.size = state.size
     qer.expiration = expiration
+    if( state.sender!=null ) {
+      qer.sender = new UTF8Buffer(state.sender.toString)
+    }
     qer
   }
 
@@ -1297,6 +1306,8 @@ class QueueEntry(val queue:Queue, val se
 
     def message_locator: AtomicReference[Object] = null
 
+    def sender: DestinationAddress = null
+
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
      * @returns true if at least one subscription advanced to the next entry as a result
of dispatching.
@@ -1438,6 +1449,7 @@ class QueueEntry(val queue:Queue, val se
     override def message_key = delivery.storeKey
     override def message_locator = delivery.storeLocator
     override def redelivery_count = delivery.redeliveries
+    override def sender = delivery.sender
 
     override def redelivered = delivery.redeliveries = ((delivery.redeliveries+1).min(Short.MaxValue)).toShort
 
@@ -1523,7 +1535,7 @@ class QueueEntry(val queue:Queue, val se
           queue.swap_out_item_counter += 1
         }
 
-        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count,
acquirer)
+        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count,
acquirer, sender)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
@@ -1581,6 +1593,18 @@ class QueueEntry(val queue:Queue, val se
       var heldBack = ListBuffer[Subscription]()
       var advancing = ListBuffer[Subscription]()
 
+      // avoid doing the copy if its' not needed.
+      var _browser_copy:Delivery = null
+      def browser_copy = {
+        if( _browser_copy==null ) {
+          _browser_copy = delivery.copy
+          if( _browser_copy.sender==null ) {
+            _browser_copy.sender = queue.address
+          }
+        }
+        _browser_copy
+      }
+
       var acquiringSub: Subscription = null
       parked.foreach{ sub=>
 
@@ -1589,7 +1613,7 @@ class QueueEntry(val queue:Queue, val se
             // advance: not interested.
             advancing += sub
           } else {
-            if (sub.offer(delivery)) {
+            if (sub.offer(browser_copy)) {
               // advance: accepted...
               advancing += sub
             } else {
@@ -1627,6 +1651,10 @@ class QueueEntry(val queue:Queue, val se
 
                   val acquiredQueueEntry = sub.acquire(entry)
                   val acquiredDelivery = delivery.copy
+                  if( acquiredDelivery.sender==null ) {
+                    acquiredDelivery.sender = queue.address
+                  }
+
                   acquiredDelivery.ack = (consumed, uow)=> {
                     if( uow!=null ) {
                       uow.retain()
@@ -1676,7 +1704,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object],
override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription)
extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object],
override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription,
override  val sender:DestinationAddress) extends EntryState {
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1240841&r1=1240840&r2=1240841&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Sun Feb  5 21:54:22 2012
@@ -141,6 +141,12 @@ sealed trait DestinationAddress {
 }
 sealed trait ConnectAddress extends DestinationAddress
 sealed trait BindAddress extends DestinationAddress
+object SimpleAddress {
+  def apply(value:String):SimpleAddress= {
+    val p = value.indexOf(":")
+    SimpleAddress(value.substring(0, p), DestinationAddress.decode_path(value.substring(p+1)))
+  }
+}
 case class SimpleAddress(val domain:String, val path:Path) extends ConnectAddress with BindAddress
{
   override def simple = this
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1240841&r1=1240840&r2=1240841&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
Sun Feb  5 21:54:22 2012
@@ -90,6 +90,7 @@ object PBSupport {
       pb.setExpiration(v.expiration)
     if(v.redeliveries!=0)
       pb.setRedeliveries(v.redeliveries)
+    pb.setSender(v.sender)
     pb
   }
 
@@ -102,6 +103,7 @@ object PBSupport {
     rc.size = pb.getSize
     rc.expiration = pb.getExpiration
     rc.redeliveries = pb.getRedeliveries.toShort
+    rc.sender = pb.getSender
     rc
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1240841&r1=1240840&r2=1240841&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
Sun Feb  5 21:54:22 2012
@@ -35,5 +35,6 @@ class QueueEntryRecord {
   var size = 0
   var expiration = 0L
   var redeliveries:Short = 0
+  var sender:Buffer = _
 
 }



Mime
View raw message