Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44DBD17759 for ; Wed, 18 Mar 2015 22:21:15 +0000 (UTC) Received: (qmail 32924 invoked by uid 500); 18 Mar 2015 22:21:15 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 32880 invoked by uid 500); 18 Mar 2015 22:21:15 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 32871 invoked by uid 99); 18 Mar 2015 22:21:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2015 22:21:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DEBE3E18D9; Wed, 18 Mar 2015 22:21:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5666 Date: Wed, 18 Mar 2015 22:21:14 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 4228e3d3e -> 8e6a404d5 https://issues.apache.org/jira/browse/AMQ-5666 Create some tests that exercise creating temp destinations using sender links with dynamic targets Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e6a404d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e6a404d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e6a404d Branch: refs/heads/master Commit: 8e6a404d5e15c3be2fcacfaa929b272d692012d7 Parents: 4228e3d Author: Timothy Bish Authored: Wed Mar 18 18:12:02 2015 -0400 Committer: Timothy Bish Committed: Wed Mar 18 18:12:02 2015 -0400 ---------------------------------------------------------------------- .../activemq/transport/amqp/AmqpSupport.java | 3 + .../transport/amqp/client/AmqpReceiver.java | 5 +- .../transport/amqp/client/AmqpSender.java | 36 ++++- .../transport/amqp/client/AmqpSession.java | 37 ++++- .../amqp/interop/AmqpTempDestinationTest.java | 137 +++++++++++++++++++ 5 files changed, 211 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java index 7af4c2c..526a043 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -52,6 +52,9 @@ public class AmqpSupport { // Symbols used in configuration of newly opened links. public static final Symbol COPY = Symbol.getSymbol("copy"); + // Lifetime policy symbols + public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + /** * Search for a given Symbol in a given array of Symbol object. * http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 585ba93..978075c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -71,12 +71,12 @@ public class AmqpReceiver extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String receiverId; + private final Source userSpecifiedSource; private String subscriptionName; private String selector; private boolean presettle; private boolean noLocal; - private Source userSpecifiedSource; /** * Create a new receiver instance. @@ -94,6 +94,7 @@ public class AmqpReceiver extends AmqpAbstractResource { throw new IllegalArgumentException("Address cannot be empty."); } + this.userSpecifiedSource = null; this.session = session; this.address = address; this.receiverId = receiverId; @@ -454,7 +455,7 @@ public class AmqpReceiver extends AmqpAbstractResource { Source source = userSpecifiedSource; Target target = new Target(); - if (userSpecifiedSource == null && address != null) { + if (source == null && address != null) { source = new Source(); source.setAddress(address); configureSource(source); http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 95b0743..c8829e0 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -64,6 +64,7 @@ public class AmqpSender extends AmqpAbstractResource { private final AmqpSession session; private final String address; private final String senderId; + private final Target userSpecifiedTarget; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -82,9 +83,37 @@ public class AmqpSender extends AmqpAbstractResource { * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { + + if (address != null && address.isEmpty()) { + throw new IllegalArgumentException("Address cannot be empty."); + } + this.session = session; this.address = address; this.senderId = senderId; + this.userSpecifiedTarget = null; + } + + /** + * Create a new sender instance using the given Target when creating the link. + * + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. + */ + public AmqpSender(AmqpSession session, Target target, String senderId) { + + if (target == null) { + throw new IllegalArgumentException("User specified Target cannot be null"); + } + + this.session = session; + this.userSpecifiedTarget = target; + this.address = target.getAddress(); + this.senderId = senderId; } /** @@ -216,8 +245,11 @@ public class AmqpSender extends AmqpAbstractResource { source.setAddress(senderId); source.setOutcomes(outcomes); - Target target = new Target(); - target.setAddress(address); + Target target = userSpecifiedTarget; + if (target == null) { + target = new Target(); + target.setAddress(address); + } String senderName = senderId + ":" + address; http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 3b2a3d1..8af362b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -82,6 +83,38 @@ public class AmqpSession extends AmqpAbstractResource { } /** + * Create a sender instance using the given Target + * + * @param target + * the caller created and configured Traget used to create the sender link. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId()); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return sender; + } + + /** * Create a receiver instance using the given address * * @param address @@ -153,10 +186,8 @@ public class AmqpSession extends AmqpAbstractResource { } /** - * Create a receiver instance using the given address + * Create a receiver instance using the given Source * - * @param address - * the address to which the receiver will subscribe for its messages. * @param source * the caller created and configured Source used to create the receiver link. * http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java new file mode 100644 index 0000000..18ef7dd --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.junit.Test; + +/** + * Tests for JMS temporary destination mappings to AMQP + */ +public class AmqpTempDestinationTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testCreateDynamicSenderToTopic() throws Exception { + doTestCreateDynamicSender(true); + } + + @Test(timeout = 60000) + public void testCreateDynamicSenderToQueue() throws Exception { + doTestCreateDynamicSender(false); + } + + protected void doTestCreateDynamicSender(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + if (topic) { + assertEquals(1, brokerView.getTemporaryTopics().length); + } else { + assertEquals(1, brokerView.getTemporaryQueues().length); + } + + connection.close(); + } + + @Test(timeout = 60000) + public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception { + doTestDynamicSenderLifetimeBoundToLinkQueue(true); + } + + @Test(timeout = 60000) + public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception { + doTestDynamicSenderLifetimeBoundToLinkQueue(false); + } + + protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + if (topic) { + assertEquals(1, brokerView.getTemporaryTopics().length); + } else { + assertEquals(1, brokerView.getTemporaryQueues().length); + } + + sender.close(); + + if (topic) { + assertEquals(0, brokerView.getTemporaryTopics().length); + } else { + assertEquals(0, brokerView.getTemporaryQueues().length); + } + + connection.close(); + } + + protected Target createDynamicTarget(boolean topic) { + + Target target = new Target(); + target.setDynamic(true); + target.setDurable(TerminusDurability.NONE); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + // Set the dynamic node lifetime-policy + Map dynamicNodeProperties = new HashMap(); + dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance()); + target.setDynamicNodeProperties(dynamicNodeProperties); + + // Set the capability to indicate the node type being created + if (!topic) { + target.setCapabilities(TEMP_QUEUE_CAPABILITY); + } else { + target.setCapabilities(TEMP_TOPIC_CAPABILITY); + } + + return target; + } +}