activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1421319 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/secur...
Date Thu, 13 Dec 2012 14:47:35 GMT
Author: chirino
Date: Thu Dec 13 14:47:30 2012
New Revision: 1421319

URL: http://svn.apache.org/viewvc?rev=1421319&view=rev
Log:
Support the noLocal option over AMQP 1.0.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1421319&r1=1421318&r2=1421319&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
Thu Dec 13 14:47:30 2012
@@ -164,7 +164,7 @@ class AmqpMessage(private var encoded_bu
       null
     }
   }
-  def getFooterProperty(name:String) = {
+  def getFooterProperty(name:AnyRef) = {
     if( decoded.getFooter !=null ) {
       decoded.getFooter.getValue.get(name).asInstanceOf[AnyRef]
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1421319&r1=1421318&r2=1421319&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Thu Dec 13 14:47:30 2012
@@ -74,6 +74,8 @@ object AmqpProtocolHandler extends Log {
 
   val JMS_SELECTOR = AmqpSymbol.valueOf("jms-selector")
   val NO_LOCAL = AmqpSymbol.valueOf("no-local");
+  val ORIGIN = AmqpSymbol.valueOf("origin");
+
   val DURABLE = new UnsignedInteger(2);
 
   val EMPTY_BYTE_ARRAY = Array[Byte]()
@@ -328,7 +330,7 @@ class AmqpProtocolHandler extends Protoc
 
 
     def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
-      security_context.session_id = Some(conn.getRemoteContainer())
+      security_context.remote_application = Some(conn.getRemoteContainer())
 
       suspend_read("host lookup")
       broker.dispatch_queue {
@@ -349,6 +351,7 @@ class AmqpProtocolHandler extends Protoc
             connection_log = virtual_host.connection_log
             host = virtual_host
             proton.setLocalContainerId(virtual_host.id)
+            security_context.session_id = Some("%s-%x".format(host.config.id, host.session_counter.incrementAndGet))
             //                proton.open()
             //                callback.onSuccess(response)
             if (virtual_host.authenticator != null && virtual_host.authorizer !=
null) {
@@ -489,7 +492,7 @@ class AmqpProtocolHandler extends Protoc
       if( source == null ) {
         // Source get set to null when a durable sub is being ended.
         source = new org.apache.qpid.proton.`type`.messaging.Source();
-        source.setAddress("/dsub/"+sender.getName);
+        source.setAddress("dsub://"+sender.getName);
         source.setDurable(DURABLE)
         source.setExpiryPolicy(TerminusExpiryPolicy.NEVER)
         sender.setSource(source);
@@ -499,17 +502,18 @@ class AmqpProtocolHandler extends Protoc
       sender.setSource(actual);
       if (requested_addresses == null) {
         sender.setSource(null)
-        close_with_error(sender, "invalid-address", "Invaild address: " + address)
+        close_with_error(sender, "amqp:not-found", "Invaild address: " + address)
         onComplete.run()
         return
       }
 
+      var noLocal = false
       val filter = source.getFilter()
       val selector = if (filter != null) {
         var value = filter.get(NO_LOCAL).asInstanceOf[DescribedType];
         if( value!=null ) {
           // TODO: setup a no-local filter.
-//            consumerInfo.setNoLocal(true);
+          noLocal = true
         }
         value = filter.get(JMS_SELECTOR).asInstanceOf[DescribedType]
         if (value != null) {
@@ -532,25 +536,12 @@ class AmqpProtocolHandler extends Protoc
 
       val presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
-      var browser = source.getDistributionMode() == COPY
-      var browser_end = browser && true
-      var exclusive = !browser && false
-      var include_seq: Option[Long] = None
-      val from_seq_opt: Option[Long] = None
-
       def is_multi_destination = if (requested_addresses.length > 1) {
         true
       } else {
         PathParser.containsWildCards(requested_addresses(0).path)
       }
 
-      if (from_seq_opt.isDefined && is_multi_destination) {
-        sender.setSource(null)
-        close_with_error(sender, "invalid-from-seq", "The from-seq header is only supported
when you subscribe to one destination")
-        onComplete.run()
-        return
-      }
-
       val persistent = DURABLE == source.getDurable() && source.getExpiryPolicy ==
TerminusExpiryPolicy.NEVER
       val addresses: Array[_ <: BindAddress] = if (persistent) {
         val dsubs = ListBuffer[BindAddress]()
@@ -577,12 +568,24 @@ class AmqpProtocolHandler extends Protoc
         requested_addresses
       }
 
+      var browser = addresses.find(_.domain != "queue").isEmpty && (source.getDistributionMode()
== COPY)
+      var browser_end = false
+      var exclusive = !browser && false
+      var include_seq: Option[Long] = None
+      val from_seq_opt: Option[Long] = None
+
+      if (from_seq_opt.isDefined && is_multi_destination) {
+        sender.setSource(null)
+        close_with_error(sender, "invalid-from-seq", "The from-seq header is only supported
when you subscribe to one destination")
+        onComplete.run()
+        return
+      }
       val from_seq = from_seq_opt.getOrElse(0L)
 
 
       link_counter += 1
       val id = link_counter
-      val consumer = new AmqpConsumer(sender, id, addresses, presettle, selector, browser,
exclusive, include_seq, from_seq, browser_end);
+      val consumer = new AmqpConsumer(sender, id, addresses, presettle, selector, noLocal,
browser, exclusive, include_seq, from_seq, browser_end);
       consumers += (id -> consumer)
 
       host.dispatch_queue {
@@ -593,7 +596,7 @@ class AmqpProtocolHandler extends Protoc
               consumers -= id
               consumer.release
               sender.setSource(null)
-              close_with_error(sender, "subscribe-failed", reason)
+              close_with_error(sender, "amqp:not-found", reason)
               onComplete.run()
             case None =>
               set_attachment(sender, consumer)
@@ -805,7 +808,21 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    def onMessage(receiver:Receiver, delivery: DeliveryImpl, message: AmqpMessage) = {
+    def onMessage(receiver:Receiver, delivery: DeliveryImpl, m: AmqpMessage) = {
+
+      // Update the message to attach some producer context to the footer..
+      // of the message.
+      val dm = m.decoded
+      val footer_map:java.util.Map[AnyRef,AnyRef] = if( dm.getFooter == null ) {
+        val map = new java.util.HashMap[AnyRef,AnyRef]
+        dm.setFooter(new Footer(map))
+        map
+      } else {
+        dm.getFooter.getValue.asInstanceOf[java.util.Map[AnyRef,AnyRef]]
+      }
+      footer_map.put(ORIGIN, session_id.get)
+      val message = new AmqpMessage(null, dm)
+
       val d = new Delivery
       d.message = message
       d.size = message.encoded.length
@@ -877,6 +894,7 @@ class AmqpProtocolHandler extends Protoc
                      val addresses: Array[_ <: BindAddress],
                      val presettle: Boolean,
                      val selector: (String, BooleanExpression),
+                     val noLocal:Boolean,
                      override val browser: Boolean,
                      override val exclusive: Boolean,
                      val include_seq: Option[Long],
@@ -922,8 +940,14 @@ class AmqpProtocolHandler extends Protoc
     override def connection = Option(AmqpProtocolHandler.this.connection)
 
     def is_persistent = false
-    def matches(delivery: Delivery) = {
+    def matches(delivery: Delivery):Boolean = {
       if( delivery.message.codec eq AmqpMessageCodec ) {
+        if ( noLocal ) {
+          val origin = delivery.message.asInstanceOf[AmqpMessage].getFooterProperty(ORIGIN)
+          if ( origin == session_id.get ) {
+            return false
+          }
+        }
         if( selector!=null ) {
           selector._2.matches(delivery.message)
         } else {

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala?rev=1421319&r1=1421318&r2=1421319&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
Thu Dec 13 14:47:30 2012
@@ -17,7 +17,7 @@
 
 package org.apache.activemq.apollo.amqp.test
 
-import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl}
+import org.apache.qpid.amqp_1_0.jms.impl.{TopicImpl, ConnectionFactoryImpl, QueueImpl}
 import javax.jms._
 
 
@@ -53,8 +53,8 @@ object QpidJmsTest {
  */
 class QpidJmsTest extends AmqpTestSupport {
 
-  def createConnection: Connection = {
-    val factory = new ConnectionFactoryImpl("localhost", port, "admin", "password")
+  def createConnection(clientId:String=null): Connection = {
+    val factory = new ConnectionFactoryImpl("localhost", port, "admin", "password", clientId)
     val connection = factory.createConnection
     connection.setExceptionListener(new ExceptionListener {
       def onException(exception: JMSException) {
@@ -65,9 +65,51 @@ class QpidJmsTest extends AmqpTestSuppor
     return connection
   }
 
+  test("NoLocal Test") {
+
+    val default_connection = createConnection("clientid")
+    val destination = new TopicImpl("topic://example")
+    val localSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var localConsumer = localSession.createConsumer(destination, null, true)
+    var localProducer = localSession.createProducer(destination)
+
+    val remoteConnection = createConnection()
+    val remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var remoteProducer = remoteSession.createProducer(destination)
+
+    remoteProducer.send(localSession.createTextMessage("1"))
+    localProducer.send(localSession.createTextMessage("2"))
+    remoteProducer.send(localSession.createTextMessage("3"))
+
+    receive_text(localConsumer) should equal("1")
+    receive_text(localConsumer) should equal("3")
+  }
+
+  test("NoLocal Durable Sub Test") {
+
+    val default_connection = createConnection("clientid")
+    val destination = new TopicImpl("topic://example2")
+    val localSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var localConsumer = localSession.createDurableSubscriber(destination, "A", null, true)
+    var localProducer = localSession.createProducer(destination)
+
+    val remoteConnection = createConnection()
+    val remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var remoteProducer = remoteSession.createProducer(destination)
+
+    remoteProducer.send(localSession.createTextMessage("1"))
+    localProducer.send(localSession.createTextMessage("2"))
+    remoteProducer.send(localSession.createTextMessage("3"))
+
+    receive_text(localConsumer) should equal("1")
+    receive_text(localConsumer) should equal("3")
+  }
+
+  def receive_text(consumer:MessageConsumer) = consumer.receive().asInstanceOf[TextMessage].getText
+
   test("unsubscribe invalid dest") {
     val queue = new QueueImpl("queue://txqueue")
-    val connection = createConnection
+    val connection = createConnection()
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
     session.unsubscribe("does not exist")
     connection.close
@@ -75,7 +117,7 @@ class QpidJmsTest extends AmqpTestSuppor
 
   ignore("browse") {
     val queue = new QueueImpl("queue://txqueue")
-    val connection = createConnection
+    val connection = createConnection()
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
     val p = session.createProducer(queue)
     val msg = session.createTextMessage("Hello World")
@@ -95,7 +137,7 @@ class QpidJmsTest extends AmqpTestSuppor
     val nMsgs = 1
     val dataFormat: String = "%01024d"
 
-    var connection = createConnection
+    var connection = createConnection()
     var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
     val p = session.createProducer(queue)
     var i = 0
@@ -109,7 +151,7 @@ class QpidJmsTest extends AmqpTestSuppor
     System.out.println("=======================================================================================")
     System.out.println(" failing a receive ")
     System.out.println("=======================================================================================")
-    connection = createConnection
+    connection = createConnection()
     session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
     var c = session.createConsumer(queue)
     i = 0
@@ -127,7 +169,7 @@ class QpidJmsTest extends AmqpTestSuppor
     System.out.println("=======================================================================================")
     System.out.println(" receiving ")
     System.out.println("=======================================================================================")
-    connection = createConnection
+    connection = createConnection()
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
     c = session.createConsumer(queue)
     i = 0

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1421319&r1=1421318&r2=1421319&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
Thu Dec 13 14:47:30 2012
@@ -42,6 +42,7 @@ class SecurityContext {
   var remote_address:SocketAddress = _
   var login_context:LoginContext = _
   var session_id:Option[String] = None
+  var remote_application:Option[String] = None
 
   case class Key(user:String,
     password:String,



Mime
View raw message