activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1230193 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Wed, 11 Jan 2012 19:11:17 GMT
Author: chirino
Date: Wed Jan 11 19:11:17 2012
New Revision: 1230193

URL: http://svn.apache.org/viewvc?rev=1230193&view=rev
Log:
Simplify the requirements of a message (removed fields which were not needed).  Also have
the delivery track the sending destination.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Wed Jan 11 19:11:17 2012
@@ -23,6 +23,7 @@ import org.apache.activemq.apollo.filter
 import org.apache.activemq.apollo.broker.store.StoreUOW
 import org.apache.activemq.apollo.util.Log
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import org.apache.activemq.apollo.dto.DestinationDTO
 
 object DeliveryProducer extends Log
 
@@ -95,16 +96,6 @@ trait DeliverySession extends SessionSin
 trait Message extends Filterable with Retained {
 
   /**
-   * the globally unique id of the message
-   */
-  def id: AsciiBuffer
-
-  /**
-   * the globally unique id of the producer
-   */
-  def producer: AsciiBuffer
-
-  /**
    *  the message priority.
    */
   def priority:Byte
@@ -175,6 +166,11 @@ object Poisoned extends DeliveryResult
 class Delivery {
 
   /**
+   * Where the delivery is originating from.
+   */
+  var sender:DestinationDTO = _
+
+  /**
    * Total size of the delivery.  Used for resource allocation tracking
    */
   var size:Int = 0
@@ -219,6 +215,7 @@ class Delivery {
   def copy() = (new Delivery).set(this)
 
   def set(other:Delivery) = {
+    sender = other.sender
     size = other.size
     seq = other.seq
     message = other.message
@@ -230,7 +227,7 @@ class Delivery {
 
   def createMessageRecord() = {
     val record = message.protocol.encode(message)
-    assert( record.size == size )
+    record.size = size
     record.locator = storeLocator
     record
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Jan 11 19:11:17 2012
@@ -527,6 +527,7 @@ class Queue(val router: LocalRouter, val
         val entry = tail_entry
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queueDelivery = delivery.copy
+        queueDelivery.sender = destination_dto
         queueDelivery.seq = entry.seq
         entry.init(queueDelivery)
         

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Wed Jan 11 19:11:17 2012
@@ -131,7 +131,11 @@ class Topic(val router:LocalRouter, val 
     def producer = session.producer
     def consumer = session.consumer
 
-    def offer(value: Delivery) = downstream.offer(value)
+    def offer(value: Delivery) = {
+      val copy = value.copy();
+      copy.sender = destination_dto
+      downstream.offer(copy)
+    }
   }
 
   case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer)
extends DeliveryConsumer {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
Wed Jan 11 19:11:17 2012
@@ -41,14 +41,10 @@ class OpenwireMessage(val message:Active
 
   def protocol = OpenwireProtocol
 
-  def producer = ascii(message.getProducerId.toString)
-
   def priority = message.getPriority
 
   def persistent = message.isPersistent
 
-  def id = _id
-
   def expiration = message.getExpiration
 
   def getBodyAs[T](toType : Class[T]) = {
@@ -89,10 +85,8 @@ object EndOfBrowseMessage extends Messag
   def retain() {}
   def release() {}
   def protocol: Protocol = null
-  def producer: AsciiBuffer = null
   def priority: Byte = 0
   def persistent: Boolean = false
-  def id: AsciiBuffer = null
   def expiration: Long = 0L
   def getProperty(name: String): AnyRef = null
   def getLocalConnectionId: AnyRef = null

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Wed Jan 11 19:11:17 2012
@@ -42,7 +42,6 @@ object StompCodec extends Log {
 
     val rc = new MessageRecord
     rc.protocol = PROTOCOL
-    rc.size = frame.size
     rc.expiration = message.expiration
 
     if( frame.content.isInstanceOf[ZeroCopyContent] ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Jan 11 19:11:17 2012
@@ -45,11 +45,6 @@ case class StompFrameMessage(frame:Stomp
   var id: AsciiBuffer = null
 
   /**
-   * the globally unique id of the producer
-   */
-  var producer: AsciiBuffer = null
-
-  /**
    *  the message priority.
    */
   var priority:Byte = 4;

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Jan 11 19:11:17 2012
@@ -128,6 +128,10 @@ class StompProtocolHandler extends Proto
   }
 
   protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
+  
+  def id(message:Message) = {
+    message.asInstanceOf[StompFrameMessage].id
+  }
 
   class StompConsumer (
 
@@ -249,9 +253,9 @@ class StompProtocolHandler extends Proto
         } else {
           if( protocol_version eq V1_0 ) {
             // register on the connection since 1.0 acks may not include the subscription
id
-            connection_ack_handlers += ( delivery.message.id-> this )
+            connection_ack_handlers += ( id(delivery.message) -> this )
           }
-          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size),
delivery.ack )
+          consumer_acks += id(delivery.message) -> new TrackedAck(Some(delivery.size),
delivery.ack )
         }
       }
 
@@ -337,9 +341,9 @@ class StompProtocolHandler extends Proto
         } else {
           if( protocol_version eq V1_0 ) {
             // register on the connection since 1.0 acks may not include the subscription
id
-            connection_ack_handlers += ( delivery.message.id-> this )
+            connection_ack_handlers += ( id(delivery.message) -> this )
           }
-          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size),
delivery.ack)
+          consumer_acks += id(delivery.message) -> new TrackedAck(Some(delivery.size),
delivery.ack)
         }
       }
 
@@ -555,7 +559,7 @@ class StompProtocolHandler extends Proto
   var codec:StompCodec = _
 
   implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
-    val rc = destination_parser.decode_destination(value.toString)
+    val rc = destination_parser.decode_multi_destination(value.toString)
     if( rc==null ) {
       throw new ProtocolException("Invalid stomp destination name: "+value);
     }



Mime
View raw message