Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 80269200BA0 for ; Fri, 14 Oct 2016 21:01:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7EC2F160ADD; Fri, 14 Oct 2016 19:01:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7692B160AD3 for ; Fri, 14 Oct 2016 21:01:34 +0200 (CEST) Received: (qmail 36388 invoked by uid 500); 14 Oct 2016 19:01:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 36379 invoked by uid 99); 14 Oct 2016 19:01:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2016 19:01:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 65EAAE00C7; Fri, 14 Oct 2016 19:01:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6467 Date: Fri, 14 Oct 2016 19:01:33 +0000 (UTC) archived-at: Fri, 14 Oct 2016 19:01:35 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.14.x cf72e7214 -> 335a490ea https://issues.apache.org/jira/browse/AMQ-6467 Fill in the source / target created for the requested dynamic node with the actual attributes we are going to support. (cherry picked from commit 10fc397ab72bcd3c111b517af687ccf4c9372ced) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/335a490e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/335a490e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/335a490e Branch: refs/heads/activemq-5.14.x Commit: 335a490eac84680bab50350a2bb5894c0248f18c Parents: cf72e72 Author: Timothy Bish Authored: Fri Oct 14 15:00:49 2016 -0400 Committer: Timothy Bish Committed: Fri Oct 14 15:01:25 2016 -0400 ---------------------------------------------------------------------- .../activemq/transport/amqp/AmqpSupport.java | 28 +++++++++++++++++++- .../transport/amqp/protocol/AmqpConnection.java | 2 +- .../transport/amqp/protocol/AmqpSession.java | 22 +++++++++++++-- .../amqp/interop/AmqpTempDestinationTest.java | 26 +++++++++++++++++- 4 files changed, 73 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/335a490e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index 9087cf1..43cecc8 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -44,6 +44,8 @@ public class AmqpSupport { // Capabilities used to identify destination type in some requests. public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); + public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue"); + public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic"); // Symbols used to announce connection information to remote peer. public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); @@ -214,4 +216,28 @@ public class AmqpSupport { throw new RuntimeException("Unexpected terminus type: " + endpoint); } } + + /** + * Given an ActiveMQDestination return the proper Capability value for the concrete destination type. + * + * @param destination + * The ActiveMQDestination whose capability is being requested. + * + * @return a Symbol that matches the defined Capability value for the ActiveMQDestiantion. + */ + public static Symbol getDestinationTypeSymbol(ActiveMQDestination destination) { + if (destination.isQueue()) { + if (destination.isTemporary()) { + return TEMP_QUEUE_CAPABILITY; + } else { + return QUEUE_CAPABILITY; + } + } else { + if (destination.isTemporary()) { + return TEMP_TOPIC_CAPABILITY; + } else { + return TOPIC_CAPABILITY; + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/335a490e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 5a402ba..3696a68 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. http://git-wip-us.apache.org/repos/asf/activemq/blob/335a490e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index 1c91962..0527211 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.protocol; import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; +import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY; import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination; @@ -46,10 +47,12 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.AmqpProtocolException; +import org.apache.activemq.transport.amqp.AmqpSupport; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.util.IntrospectionSupport; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -186,9 +189,17 @@ public class AmqpSession implements AmqpResource { if (target.getDynamic()) { destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities()); + + Map dynamicNodeProperties = new HashMap(); + dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); + + // Currently we only support temporary destinations with delete on close lifetime policy. Target actualTarget = new Target(); actualTarget.setAddress(destination.getQualifiedName()); + actualTarget.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination)); actualTarget.setDynamic(true); + actualTarget.setDynamicNodeProperties(dynamicNodeProperties); + protonReceiver.setTarget(actualTarget); receiver.addCloseAction(new Runnable() { @@ -298,11 +309,18 @@ public class AmqpSession implements AmqpResource { return; } } else if (source.getDynamic()) { - // lets create a temp dest. destination = connection.createTemporaryDestination(protonSender, source.getCapabilities()); + + Map dynamicNodeProperties = new HashMap(); + dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); + + // Currently we only support temporary destinations with delete on close lifetime policy. source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(destination.getQualifiedName()); + source.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination)); source.setDynamic(true); + source.setDynamicNodeProperties(dynamicNodeProperties); + sender.addCloseAction(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/335a490e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java index b476993..fb5e9e1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILI import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.HashMap; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.transport.amqp.AmqpSupport; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -120,6 +122,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { doTestCreateDynamicSender(false); } + @SuppressWarnings("unchecked") protected void doTestCreateDynamicSender(boolean topic) throws Exception { Target target = createDynamicTarget(topic); @@ -132,10 +135,20 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { AmqpSender sender = session.createSender(target); assertNotNull(sender); + Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget(); + Map dynamicNodeProperties = remoteTarget.getDynamicNodeProperties(); + Symbol[] capabilites = remoteTarget.getCapabilities(); + + assertTrue(Boolean.TRUE.equals(remoteTarget.getDynamic())); + assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY)); + assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY)); + if (topic) { assertEquals(1, brokerView.getTemporaryTopics().length); + assertTrue(AmqpSupport.contains(capabilites, TEMP_TOPIC_CAPABILITY)); } else { assertEquals(1, brokerView.getTemporaryQueues().length); + assertTrue(AmqpSupport.contains(capabilites, TEMP_QUEUE_CAPABILITY)); } connection.close(); @@ -190,6 +203,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { doTestCreateDynamicSender(false); } + @SuppressWarnings("unchecked") protected void doTestCreateDynamicReceiver(boolean topic) throws Exception { Source source = createDynamicSource(topic); @@ -202,10 +216,20 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { AmqpReceiver receiver = session.createReceiver(source); assertNotNull(receiver); + Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource(); + Map dynamicNodeProperties = remoteSource.getDynamicNodeProperties(); + Symbol[] capabilites = remoteSource.getCapabilities(); + + assertTrue(Boolean.TRUE.equals(remoteSource.getDynamic())); + assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY)); + assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY)); + if (topic) { assertEquals(1, brokerView.getTemporaryTopics().length); + assertTrue(AmqpSupport.contains(capabilites, TEMP_TOPIC_CAPABILITY)); } else { assertEquals(1, brokerView.getTemporaryQueues().length); + assertTrue(AmqpSupport.contains(capabilites, TEMP_QUEUE_CAPABILITY)); } connection.close();