activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1068 JMS + AMQP routing
Date Mon, 27 Mar 2017 19:18:37 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 78d0193fc -> 2ef0d2601


ARTEMIS-1068 JMS + AMQP routing


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c792b8e2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c792b8e2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c792b8e2

Branch: refs/heads/master
Commit: c792b8e2741d24aef24f07b78c733ebf5f225ed7
Parents: 78d0193
Author: Justin Bertram <jbertram@apache.org>
Authored: Sat Mar 25 08:13:25 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Mar 27 15:14:43 2017 -0400

----------------------------------------------------------------------
 .../artemis/protocol/amqp/broker/AMQPMessage.java     | 11 +++++++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java     |  4 ++++
 .../artemis/protocol/amqp/proton/AmqpSupport.java     |  2 ++
 .../amqp/proton/ProtonServerReceiverContext.java      | 14 +++++++++++++-
 4 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 522ae16..d241958 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -256,6 +256,17 @@ public class AMQPMessage extends RefCountMessage {
       if (routingType != null) {
          return RoutingType.getType((byte) routingType);
       } else {
+         routingType = getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
+         if (routingType != null) {
+            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE
== (byte) routingType) {
+               return RoutingType.ANYCAST;
+            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE
== (byte) routingType) {
+               return RoutingType.MULTICAST;
+            }
+         } else {
+            return null;
+         }
+
          return null;
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 034cb72..18294e0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -575,4 +575,8 @@ public class AMQPSessionCallback implements SessionCallback {
    public void removeTemporaryQueue(String address) throws Exception {
       serverSession.deleteQueue(SimpleString.toSimpleString(address));
    }
+
+   public RoutingType getDefaultRoutingType(String address) {
+      return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 227ee5d..3a36f16 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -39,6 +39,8 @@ public class AmqpSupport {
    // Capabilities used to identify destination type in some requests.
    public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
    public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+   public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue");
+   public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic");
 
    // Symbols used to announce connection information to remote peer.
    public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c792b8e2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 34a522f..596e93a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -86,7 +86,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
+               sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
@@ -122,6 +122,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable
implements
       flow(maxCreditAllocation, minCreditRefresh);
    }
 
+   private RoutingType getRoutingType(Symbol[] symbols) {
+      for (Symbol symbol : symbols) {
+         if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol))
{
+            return RoutingType.MULTICAST;
+         } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol))
{
+            return RoutingType.ANYCAST;
+         }
+      }
+
+      return sessionSPI.getDefaultRoutingType(address);
+   }
+
    /*
    * called when Proton receives a message to be delivered via a Delivery.
    *


Mime
View raw message