activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1416613 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-distro/src/main/descriptors/
Date Mon, 03 Dec 2012 18:18:22 GMT
Author: chirino
Date: Mon Dec  3 18:18:21 2012
New Revision: 1416613

URL: http://svn.apache.org/viewvc?rev=1416613&view=rev
Log:
Implement selector filtering for AMQP messages.  And proper closing of consumers.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1416613&r1=1416612&r2=1416613&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
Mon Dec  3 18:18:21 2012
@@ -28,6 +28,9 @@ import org.fusesource.hawtbuf.Buffer
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.fusesource.hawtbuf.UTF8Buffer
 import org.apache.qpid.proton.hawtdispatch.impl.DroppingWritableBuffer
+import org.apache.qpid.proton.`type`.{UnsignedLong, UnsignedInteger}
+import org.apache.qpid.proton.`type`.messaging.{Properties, Header}
+import org.apache.activemq.apollo.filter.Filterable
 
 object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
   def create = Array[MessageCodec](AmqpMessageCodec)
@@ -55,6 +58,18 @@ object AmqpMessageCodec extends MessageC
 
 object AmqpMessage {
   val SENDER_CONTAINER_KEY = "sender-container"
+
+  val prefixVendor = "JMS_AMQP_";
+  val prefixDeliveryAnnotationsKey = prefixVendor+"DA_";
+  val prefixMessageAnnotationsKey= prefixVendor+"MA_";
+  val prefixFooterKey = prefixVendor+"FT_";
+
+  val firstAcquirerKey = prefixVendor + "FirstAcquirer";
+  val subjectKey =  prefixVendor +"Subject";
+  val contentTypeKey = prefixVendor +"ContentType";
+  val contentEncodingKey = prefixVendor +"ContentEncoding";
+  val replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
+
 }
 import AmqpMessage._
 
@@ -121,7 +136,7 @@ class AmqpMessage(private var encoded_bu
     }
   }
 
-  def getProperty(name: String) = {
+  def getApplicationProperty(name:String) = {
     if( decoded.getApplicationProperties !=null ) {
       decoded.getApplicationProperties.getValue.get(name).asInstanceOf[AnyRef]
     } else {
@@ -129,6 +144,110 @@ class AmqpMessage(private var encoded_bu
     }
   }
 
+  def getMessageAnnotationProperty(name:String) = {
+    if( decoded.getMessageAnnotations !=null ) {
+      var ma = decoded.getMessageAnnotations
+      var rc = ma.getValue.get(name)
+      if( rc == null ) {
+        rc = ma.getValue.get(org.apache.qpid.proton.`type`.Symbol.valueOf(name))
+      }
+      rc.asInstanceOf[AnyRef]
+    } else {
+      null
+    }
+  }
+
+  def getDeliveryAnnotationProperty(name:String) = {
+    if( decoded.getDeliveryAnnotations !=null ) {
+      decoded.getDeliveryAnnotations.getValue.get(name).asInstanceOf[AnyRef]
+    } else {
+      null
+    }
+  }
+  def getFooterProperty(name:String) = {
+    if( decoded.getFooter !=null ) {
+      decoded.getFooter.getValue.get(name).asInstanceOf[AnyRef]
+    } else {
+      null
+    }
+  }
+
+  def getHeader[T](default:T)(func: (Header)=>T) = {
+    if( decoded.getHeader == null ) {
+      default
+    } else {
+      func(decoded.getHeader)
+    }
+  }
+  def getProperties[T](default:T)(func: (Properties)=>T) = {
+    if( decoded.getProperties == null ) {
+      default
+    } else {
+      func(decoded.getProperties)
+    }
+  }
+
+//  object JMSFilterable extends Filterable {
+//    def getBodyAs[T](kind: Class[T]): T = AmqpMessage.this.getBodyAs(kind)
+//    def getLocalConnectionId: AnyRef = AmqpMessage.this.getLocalConnectionId
+//    def getProperty(name: String) = {
+//    }
+//  }
+
+  def getProperty(name: String) = {
+    val rc:AnyRef = (name match {
+      case "JMSDeliveryMode" =>
+        getHeader[AnyRef](null)(header=> if(header.getDurable) "PERSISTENT" else "NON_PERSISTENT"
)
+      case "JMSPriority" =>
+        new java.lang.Integer(decoded.getPriority)
+      case "JMSType" =>
+        getMessageAnnotationProperty("x-opt-jms-type")
+      case "JMSMessageID" =>
+        getProperties[AnyRef](null)(_.getMessageId)
+      case "JMSDestination" =>
+        getProperties[String](null)(_.getTo)
+      case "JMSReplyTo" =>
+        getProperties[String](null)(_.getReplyTo)
+      case "JMSCorrelationID" =>
+        getProperties[AnyRef](null)(_.getCorrelationId)
+//      case "JMSExpiration" =>
+//        new java.lang.Long(decoded.getTtl)
+      case "JMSExpiration" =>
+        getProperties[AnyRef](null)(x=> Option(x.getAbsoluteExpiryTime()).map(y=> new
java.lang.Long(y.getTime)).getOrElse(null))
+      case "JMSXDeliveryCount" =>
+        getHeader[AnyRef](null)(_.getDeliveryCount)
+      case "JMSXUserID" =>
+        getProperties[AnyRef](null)(_.getUserId)
+      case "JMSXGroupID" =>
+        getProperties[AnyRef](null)(_.getGroupId)
+      case "JMSXGroupSeq" =>
+        getProperties[AnyRef](null)(_.getGroupSequence)
+      case x if x == firstAcquirerKey =>
+        getHeader[AnyRef](null)(_.getFirstAcquirer)
+      case x if x == subjectKey =>
+        getProperties[AnyRef](null)(_.getSubject)
+      case x if x == contentTypeKey =>
+        getProperties[AnyRef](null)(_.getContentType)
+      case x if x == contentEncodingKey =>
+        getProperties[AnyRef](null)(_.getContentEncoding)
+      case x if x == replyToGroupIDKey =>
+        getProperties[AnyRef](null)(_.getReplyToGroupId)
+      case x if x.startsWith(prefixDeliveryAnnotationsKey) =>
+        getDeliveryAnnotationProperty(x)
+      case x if x.startsWith(prefixMessageAnnotationsKey)  =>
+        getMessageAnnotationProperty(x)
+      case x if x.startsWith(prefixFooterKey)  =>
+        getFooterProperty(x)
+      case x =>
+        getApplicationProperty(x)
+    }) match {
+      case x:UnsignedInteger => new java.lang.Long(x.longValue());
+      case x:UnsignedLong => new java.lang.Long(x.longValue());
+      case x => x
+    }
+    rc
+  }
+
   def release() {}
   def retain() {}
   def retained(): Int = 0

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1416613&r1=1416612&r2=1416613&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Mon Dec  3 18:18:21 2012
@@ -177,7 +177,7 @@ class AmqpProtocolHandler extends Protoc
   }
 
   var amqp_connection:AmqpTransport = _
-  var amqp_trace = false
+  var amqp_trace = true
 
   def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
 
@@ -405,6 +405,18 @@ class AmqpProtocolHandler extends Protoc
       amqp_connection.context(endpoint).setAttachment(value)
     }
 
+    def processSenderClose(sender: Sender, onComplete: Task) = {
+      get_attachment(sender) match {
+        case null =>
+          sender.close()
+          onComplete.run()
+        case consumer: AmqpConsumer =>
+          // Lets disconnect the route.
+          set_attachment(sender, null)
+          consumer.close
+      }
+    }
+
     def processReceiverClose(receiver: Receiver, onComplete: Task) {
       get_attachment(receiver) match {
         case null =>
@@ -445,6 +457,7 @@ class AmqpProtocolHandler extends Protoc
       val (address, requested_addresses, actual) = decode_source(source)
       sender.setSource(actual);
       if (requested_addresses == null) {
+        sender.setSource(null)
         close_with_error(sender, "invalid-address", "Invaild address: " + address)
         onComplete.run()
         return
@@ -459,6 +472,7 @@ class AmqpProtocolHandler extends Protoc
             (selector, SelectorParser.parse(selector))
           } catch {
             case e: FilterException =>
+              sender.setSource(null)
               close_with_error(sender, "amqp:invalid-field", "Invalid selector expression
'%s': %s".format(selector, e.getMessage))
               onComplete.run()
               return
@@ -485,6 +499,7 @@ class AmqpProtocolHandler extends Protoc
       }
 
       if (from_seq_opt.isDefined && is_multi_destination) {
+        sender.setSource(null)
         close_with_error(sender, "invalid-from-seq", "The from-seq header is only supported
when you subscribe to one destination")
         onComplete.run()
         return
@@ -500,6 +515,7 @@ class AmqpProtocolHandler extends Protoc
               case "dsub" => dsubs += address
               case "topic" => topics += address
               case _ =>
+                sender.setSource(null)
                 close_with_error(sender, "invalid-from-seq", "A durable link can only be
used on a topic destination")
                 onComplete.run()
                 return
@@ -518,7 +534,7 @@ class AmqpProtocolHandler extends Protoc
 
       link_counter += 1
       val id = link_counter
-      val consumer = new AmqpConsumer(sender, link_counter, requested_addresses, presettle,
selector, browser, exclusive, include_seq, from_seq, browser_end);
+      val consumer = new AmqpConsumer(sender, id, requested_addresses, presettle, selector,
browser, exclusive, include_seq, from_seq, browser_end);
       consumers += (id -> consumer)
 
       host.dispatch_queue {
@@ -528,6 +544,7 @@ class AmqpProtocolHandler extends Protoc
             case Some(reason) =>
               consumers -= id
               consumer.release
+              sender.setSource(null)
               close_with_error(sender, "subscribe-failed", reason)
               onComplete.run()
             case None =>
@@ -539,11 +556,6 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    def processSenderClose(sender: Sender, onComplete: Task) {
-      sender.close()
-      onComplete.run()
-    }
-
     var gracefully_closed = false
     override def processFailure(e: Throwable) {
       var msg = "Internal Server Error: " + e
@@ -820,7 +832,17 @@ class AmqpProtocolHandler extends Protoc
     override def connection = Option(AmqpProtocolHandler.this.connection)
 
     def is_persistent = false
-    def matches(message: Delivery) = true
+    def matches(delivery: Delivery) = {
+      if( delivery.message.codec eq AmqpMessageCodec ) {
+        if( selector!=null ) {
+          selector._2.matches(delivery.message)
+        } else {
+          true
+        }
+      } else {
+        false
+      }
+    }
     override def start_from_tail = from_seq == -1
 
     override def jms_selector = if (selector != null) {
@@ -837,6 +859,17 @@ class AmqpProtocolHandler extends Protoc
       starting_seq = seq
     }
 
+    def isSenderClosed = {
+      sender.getLocalState == EndpointState.CLOSED
+    }
+
+    def close = {
+      consumers -= subscription_id
+      host.dispatch_queue {
+        host.router.unbind(addresses, this, false , security_context)
+        release()
+      }
+    }
 
     var nextTagId = 0L;
     val tagCache = new util.HashSet[Array[Byte]]();
@@ -1063,6 +1096,9 @@ class AmqpProtocolHandler extends Protoc
         reject(next, Undelivered)
         next = session_manager.poll
       }
+
+      sender.close()
+      pump_out
       super.dispose()
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml?rev=1416613&r1=1416612&r2=1416613&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml Mon Dec
 3 18:18:21 2012
@@ -95,6 +95,7 @@
         <include>org.apache.commons:commons-math</include>
         <include>org.fusesource.stompjms:stompjms-client</include>
         <include>org.apache.qpid:proton</include>
+        <include>org.apache.qpid:proton-hawtdispatch</include>
 
       </includes>
       <excludes>



Mime
View raw message