Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 757B310B18 for ; Thu, 12 Feb 2015 20:11:28 +0000 (UTC) Received: (qmail 58232 invoked by uid 500); 12 Feb 2015 20:11:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 58094 invoked by uid 500); 12 Feb 2015 20:11:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 58069 invoked by uid 99); 12 Feb 2015 20:11:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2015 20:11:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 37708E01BD; Thu, 12 Feb 2015 20:11:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 12 Feb 2015 20:11:28 -0000 Message-Id: <3bf48439242e4c71832c0a0ceac4d699@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-6 git commit: ACTIVEMQ-77 auto queue creation on AMQP send/rec Repository: activemq-6 Updated Branches: refs/heads/master 87966029c -> 5f65c07d3 ACTIVEMQ-77 auto queue creation on AMQP send/rec Implements a new feature for the broker whereby it may automatically create and delete queues which are not explicitly defined through the management API or file-based configuration when a client sends a message to or receives from a queue via the AMQP protocol. Note, the destination has to be named like "jms.queue.*" to be auto- created. The queue may subsequently be deleted when it no longer has any messages and consumers. Auto-creation and auto-deletion can both be turned on/off via address-setting. Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/2cbef28c Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/2cbef28c Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/2cbef28c Branch: refs/heads/master Commit: 2cbef28cc727dc002031d01c209d89725fba1147 Parents: 8796602 Author: jbertram Authored: Tue Feb 10 15:18:44 2015 -0600 Committer: jbertram Committed: Thu Feb 12 14:02:45 2015 -0600 ---------------------------------------------------------------------- .../plug/ProtonSessionIntegrationCallback.java | 22 +++- .../tests/integration/proton/ProtonTest.java | 106 +++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/2cbef28c/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 5fe4472..5e0a9de 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -150,8 +150,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se @Override public boolean queueQuery(String queueName) throws Exception { + boolean queryResult = false; + QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); - return queueQuery.isExists(); + + if (queueQuery.isExists()) + { + queryResult = true; + } + else + { + if (queueQuery.isAutoCreateJmsQueues()) + { + serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); + queryResult = true; + } + else + { + queryResult = false; + } + } + + return queryResult; } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/2cbef28c/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java index 79cb3b5..3665c73 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java @@ -40,6 +40,8 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Random; +import org.apache.activemq.api.core.management.ResourceNames; +import org.apache.activemq.tests.util.RandomUtil; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; @@ -886,6 +888,110 @@ public class ProtonTest extends ServiceTestBase } + @Test + public void testUsingPlainAMQPSenderWithNonExistantQueue() throws Exception + { + if (this.protocol != 0 && protocol != 3) + { + return; + } + + String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString(); + + org.apache.qpid.amqp_1_0.client.Connection connection = null; + + try + { + // Step 1. Create an amqp qpid 1.0 connection + connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null); + + // Step 2. Create a session + org.apache.qpid.amqp_1_0.client.Session session = connection.createSession(); + + // Step 3. Create a sender + Sender sender = session.createSender(queue); + + assertNotNull(server.locateQueue(new SimpleString(queue))); + + // Step 4. send a simple message + sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message")); + + // Step 5. create a moving receiver, this means the message will be removed from the queue + Receiver rec = session.createMovingReceiver(queue); + + // Step 6. set some credit so we can receive + rec.setCredit(UnsignedInteger.valueOf(1), false); + + // Step 7. receive the simple message + org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000); + System.out.println("message = " + m.getPayload()); + + // Step 8. acknowledge the message + rec.acknowledge(m); + } + finally + { + if (connection != null) + { + // Step 9. close the connection + connection.close(); + } + } + } + + + @Test + public void testUsingPlainAMQPReceiverWithNonExistantQueue() throws Exception + { + if (this.protocol != 0 && protocol != 3) + { + return; + } + + String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString(); + + org.apache.qpid.amqp_1_0.client.Connection connection = null; + + try + { + // Step 1. Create an amqp qpid 1.0 connection + connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null, null); + + // Step 2. Create a session + org.apache.qpid.amqp_1_0.client.Session session = connection.createSession(); + + // Step 3. create a moving receiver, this means the message will be removed from the queue + Receiver rec = session.createMovingReceiver(queue); + + assertNotNull(server.locateQueue(new SimpleString(queue))); + + // Step 4. Create a sender + Sender sender = session.createSender(queue); + + // Step 5. send a simple message + sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message")); + + // Step 6. set some credit so we can receive + rec.setCredit(UnsignedInteger.valueOf(1), false); + + // Step 7. receive the simple message + org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000); + System.out.println("message = " + m.getPayload()); + + // Step 8. acknowledge the message + rec.acknowledge(m); + } + finally + { + if (connection != null) + { + // Step 9. close the connection + connection.close(); + } + } + } + + private javax.jms.Queue createQueue(String address) { if (protocol == 0 || protocol == 3)