Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B17F9200BBA for ; Fri, 21 Oct 2016 11:56:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B0214160B10; Fri, 21 Oct 2016 09:56:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8111A160AE9 for ; Fri, 21 Oct 2016 11:56:06 +0200 (CEST) Received: (qmail 68453 invoked by uid 500); 21 Oct 2016 09:56:05 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 68177 invoked by uid 99); 21 Oct 2016 09:56:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2016 09:56:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 11007F15B4; Fri, 21 Oct 2016 09:56:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: martyntaylor@apache.org To: commits@activemq.apache.org Date: Fri, 21 Oct 2016 09:56:17 -0000 Message-Id: <77236950c389465e986e23bf90155bcd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/41] activemq-artemis git commit: ARTEMIS-795 Tests for temporary destinations using dynamic nodes archived-at: Fri, 21 Oct 2016 09:56:07 -0000 ARTEMIS-795 Tests for temporary destinations using dynamic nodes Tests for the management of temporary destinations using the dynamic node feature. Failing case, the broker return a source or target that indicates it will honor the lifetime policy of delete on close but the temporary destination remains in existence after the link it closed. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f1728abb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f1728abb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f1728abb Branch: refs/heads/ARTEMIS-780 Commit: f1728abb5f04bef29f4640114444060c4ff95f24 Parents: 6052f7a Author: Timothy Bish Authored: Tue Oct 11 18:51:07 2016 -0400 Committer: Martyn Taylor Committed: Fri Oct 14 10:24:13 2016 +0100 ---------------------------------------------------------------------- .../amqp/AmqpTempDestinationTest.java | 329 +++++++++++++++++++ 1 file changed, 329 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f1728abb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java new file mode 100644 index 0000000..4dbe21e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; +import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for temporary destination handling over AMQP + */ +public class AmqpTempDestinationTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AmqpTempDestinationTest.class); + + @Test(timeout = 60000) + public void testCreateDynamicSenderToTopic() throws Exception { + doTestCreateDynamicSender(true); + } + + @Test(timeout = 60000) + public void testCreateDynamicSenderToQueue() throws Exception { + doTestCreateDynamicSender(false); + } + + @SuppressWarnings("unchecked") + protected void doTestCreateDynamicSender(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget(); + assertTrue(remoteTarget.getDynamic()); + assertTrue(remoteTarget.getDurable().equals(TerminusDurability.NONE)); + assertTrue(remoteTarget.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH)); + + // Check the dynamic node lifetime-policy + Map dynamicNodeProperties = remoteTarget.getDynamicNodeProperties(); + assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY)); + assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY)); + + Queue queueView = getProxyToQueue(remoteTarget.getAddress()); + assertNotNull(queueView); + + connection.close(); + } + + @Test(timeout = 60000) + public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception { + doTestDynamicSenderLifetimeBoundToLinkQueue(true); + } + + @Test(timeout = 60000) + public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception { + doTestDynamicSenderLifetimeBoundToLinkQueue(false); + } + + protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget(); + Queue queueView = getProxyToQueue(remoteTarget.getAddress()); + assertNotNull(queueView); + + sender.close(); + + queueView = getProxyToQueue(remoteTarget.getAddress()); + assertNull(queueView); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateDynamicReceiverToTopic() throws Exception { + doTestCreateDynamicSender(true); + } + + @Test(timeout = 60000) + public void testCreateDynamicReceiverToQueue() throws Exception { + doTestCreateDynamicSender(false); + } + + @SuppressWarnings("unchecked") + protected void doTestCreateDynamicReceiver(boolean topic) throws Exception { + Source source = createDynamicSource(topic); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(source); + assertNotNull(receiver); + + Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource(); + assertTrue(remoteSource.getDynamic()); + assertTrue(remoteSource.getDurable().equals(TerminusDurability.NONE)); + assertTrue(remoteSource.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH)); + + // Check the dynamic node lifetime-policy + Map dynamicNodeProperties = remoteSource.getDynamicNodeProperties(); + assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY)); + assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY)); + + Queue queueView = getProxyToQueue(remoteSource.getAddress()); + assertNotNull(queueView); + + connection.close(); + } + + @Test(timeout = 60000) + public void testDynamicReceiverLifetimeBoundToLinkTopic() throws Exception { + doTestDynamicReceiverLifetimeBoundToLinkQueue(true); + } + + @Test(timeout = 60000) + public void testDynamicReceiverLifetimeBoundToLinkQueue() throws Exception { + doTestDynamicReceiverLifetimeBoundToLinkQueue(false); + } + + protected void doTestDynamicReceiverLifetimeBoundToLinkQueue(boolean topic) throws Exception { + Source source = createDynamicSource(topic); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(source); + assertNotNull(receiver); + + Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource(); + Queue queueView = getProxyToQueue(remoteSource.getAddress()); + assertNotNull(queueView); + + receiver.close(); + + queueView = getProxyToQueue(remoteSource.getAddress()); + assertNull(queueView); + + connection.close(); + } + + @Test(timeout = 60000) + public void TestCreateDynamicQueueSenderAndPublish() throws Exception { + doTestCreateDynamicSenderAndPublish(false); + } + + @Test(timeout = 60000) + public void TestCreateDynamicTopicSenderAndPublish() throws Exception { + doTestCreateDynamicSenderAndPublish(true); + } + + protected void doTestCreateDynamicSenderAndPublish(boolean topic) throws Exception { + Target target = createDynamicTarget(topic); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(target); + assertNotNull(sender); + + Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget(); + Queue queueView = getProxyToQueue(remoteTarget.getAddress()); + assertNotNull(queueView); + + // Get the new address + String address = sender.getSender().getRemoteTarget().getAddress(); + LOG.info("New dynamic sender address -> {}", address); + + // Create a message and send to a receive that is listening on the newly + // created dynamic link address. + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg-1"); + message.setText("Test-Message"); + + AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(1); + + sender.send(message); + + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Should have read a message", received); + received.accept(); + + receiver.close(); + sender.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateDynamicReceiverToTopicAndSend() throws Exception { + doTestCreateDynamicSender(true); + } + + @Test(timeout = 60000) + public void testCreateDynamicReceiverToQueueAndSend() throws Exception { + doTestCreateDynamicSender(false); + } + + protected void doTestCreateDynamicReceiverAndSend(boolean topic) throws Exception { + Source source = createDynamicSource(topic); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(source); + assertNotNull(receiver); + + Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource(); + Queue queueView = getProxyToQueue(remoteSource.getAddress()); + assertNotNull(queueView); + + // Get the new address + String address = receiver.getReceiver().getRemoteSource().getAddress(); + LOG.info("New dynamic receiver address -> {}", address); + + // Create a message and send to a receive that is listening on the newly + // created dynamic link address. + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg-1"); + message.setText("Test-Message"); + + AmqpSender sender = session.createSender(address); + sender.send(message); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Should have read a message", received); + received.accept(); + + sender.close(); + receiver.close(); + + connection.close(); + } + + protected Source createDynamicSource(boolean topic) { + + Source source = new Source(); + source.setDynamic(true); + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + // Set the dynamic node lifetime-policy + Map dynamicNodeProperties = new HashMap<>(); + dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); + source.setDynamicNodeProperties(dynamicNodeProperties); + + // Set the capability to indicate the node type being created + if (!topic) { + source.setCapabilities(TEMP_QUEUE_CAPABILITY); + } else { + source.setCapabilities(TEMP_TOPIC_CAPABILITY); + } + + return source; + } + + protected Target createDynamicTarget(boolean topic) { + + Target target = new Target(); + target.setDynamic(true); + target.setDurable(TerminusDurability.NONE); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + // Set the dynamic node lifetime-policy + Map dynamicNodeProperties = new HashMap<>(); + dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); + target.setDynamicNodeProperties(dynamicNodeProperties); + + // Set the capability to indicate the node type being created + if (!topic) { + target.setCapabilities(TEMP_QUEUE_CAPABILITY); + } else { + target.setCapabilities(TEMP_TOPIC_CAPABILITY); + } + + return target; + } +}