activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6467
Date Fri, 14 Oct 2016 19:00:58 GMT
Repository: activemq
Updated Branches:
  refs/heads/master d9d1ae73e -> 10fc397ab


https://issues.apache.org/jira/browse/AMQ-6467

Fill in the source / target created for the requested dynamic node with
the actual attributes we are going to support.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/10fc397a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/10fc397a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/10fc397a

Branch: refs/heads/master
Commit: 10fc397ab72bcd3c111b517af687ccf4c9372ced
Parents: d9d1ae7
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Oct 14 15:00:49 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Oct 14 15:00:49 2016 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpSupport.java    | 28 +++++++++++++++++++-
 .../transport/amqp/protocol/AmqpConnection.java |  2 +-
 .../transport/amqp/protocol/AmqpSession.java    | 22 +++++++++++++--
 .../amqp/interop/AmqpTempDestinationTest.java   | 26 +++++++++++++++++-
 4 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/10fc397a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index 9087cf1..43cecc8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -44,6 +44,8 @@ public class AmqpSupport {
     // Capabilities used to identify destination type in some requests.
     public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
     public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+    public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue");
+    public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic");
 
     // Symbols used to announce connection information to remote peer.
     public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
@@ -214,4 +216,28 @@ public class AmqpSupport {
             throw new RuntimeException("Unexpected terminus type: " + endpoint);
         }
     }
+
+    /**
+     * Given an ActiveMQDestination return the proper Capability value for the concrete destination
type.
+     *
+     * @param destination
+     *      The ActiveMQDestination whose capability is being requested.
+     *
+     * @return a Symbol that matches the defined Capability value for the ActiveMQDestiantion.
+     */
+    public static Symbol getDestinationTypeSymbol(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            if (destination.isTemporary()) {
+                return TEMP_QUEUE_CAPABILITY;
+            } else {
+                return QUEUE_CAPABILITY;
+            }
+        } else {
+            if (destination.isTemporary()) {
+                return TEMP_TOPIC_CAPABILITY;
+            } else {
+                return TOPIC_CAPABILITY;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/10fc397a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 5a402ba..3696a68 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/activemq/blob/10fc397a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 1c91962..0527211 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.protocol;
 import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
 import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
 import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
 import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
 import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
@@ -46,10 +47,12 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.activemq.transport.amqp.AmqpSupport;
 import org.apache.activemq.transport.amqp.ResponseHandler;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -186,9 +189,17 @@ public class AmqpSession implements AmqpResource {
 
             if (target.getDynamic()) {
                 destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
+
+                Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol,
Object>();
+                dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
+
+                // Currently we only support temporary destinations with delete on close
lifetime policy.
                 Target actualTarget = new Target();
                 actualTarget.setAddress(destination.getQualifiedName());
+                actualTarget.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination));
                 actualTarget.setDynamic(true);
+                actualTarget.setDynamicNodeProperties(dynamicNodeProperties);
+
                 protonReceiver.setTarget(actualTarget);
                 receiver.addCloseAction(new Runnable() {
 
@@ -298,11 +309,18 @@ public class AmqpSession implements AmqpResource {
                     return;
                 }
             } else if (source.getDynamic()) {
-                // lets create a temp dest.
                 destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
+
+                Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol,
Object>();
+                dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
+
+                // Currently we only support temporary destinations with delete on close
lifetime policy.
                 source = new org.apache.qpid.proton.amqp.messaging.Source();
                 source.setAddress(destination.getQualifiedName());
+                source.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination));
                 source.setDynamic(true);
+                source.setDynamicNodeProperties(dynamicNodeProperties);
+
                 sender.addCloseAction(new Runnable() {
 
                     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/10fc397a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
index b476993..fb5e9e1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILI
 import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.HashMap;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.transport.amqp.AmqpSupport;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -120,6 +122,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
         doTestCreateDynamicSender(false);
     }
 
+    @SuppressWarnings("unchecked")
     protected void doTestCreateDynamicSender(boolean topic) throws Exception {
         Target target = createDynamicTarget(topic);
 
@@ -132,10 +135,20 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
         AmqpSender sender = session.createSender(target);
         assertNotNull(sender);
 
+        Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+        Map<Symbol, Object> dynamicNodeProperties = remoteTarget.getDynamicNodeProperties();
+        Symbol[] capabilites = remoteTarget.getCapabilities();
+
+        assertTrue(Boolean.TRUE.equals(remoteTarget.getDynamic()));
+        assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
+        assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
+
         if (topic) {
             assertEquals(1, brokerView.getTemporaryTopics().length);
+            assertTrue(AmqpSupport.contains(capabilites, TEMP_TOPIC_CAPABILITY));
         } else {
             assertEquals(1, brokerView.getTemporaryQueues().length);
+            assertTrue(AmqpSupport.contains(capabilites, TEMP_QUEUE_CAPABILITY));
         }
 
         connection.close();
@@ -190,6 +203,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
         doTestCreateDynamicSender(false);
     }
 
+    @SuppressWarnings("unchecked")
     protected void doTestCreateDynamicReceiver(boolean topic) throws Exception {
         Source source = createDynamicSource(topic);
 
@@ -202,10 +216,20 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
         AmqpReceiver receiver = session.createReceiver(source);
         assertNotNull(receiver);
 
+        Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+        Map<Symbol, Object> dynamicNodeProperties = remoteSource.getDynamicNodeProperties();
+        Symbol[] capabilites = remoteSource.getCapabilities();
+
+        assertTrue(Boolean.TRUE.equals(remoteSource.getDynamic()));
+        assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
+        assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
+
         if (topic) {
             assertEquals(1, brokerView.getTemporaryTopics().length);
+            assertTrue(AmqpSupport.contains(capabilites, TEMP_TOPIC_CAPABILITY));
         } else {
             assertEquals(1, brokerView.getTemporaryQueues().length);
+            assertTrue(AmqpSupport.contains(capabilites, TEMP_QUEUE_CAPABILITY));
         }
 
         connection.close();


Mime
View raw message