activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-6 git commit: ACTIVEMQ-77 auto queue creation on AMQP send/rec
Date Thu, 12 Feb 2015 20:11:28 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 87966029c -> 5f65c07d3


ACTIVEMQ-77 auto queue creation on AMQP send/rec

Implements a new feature for the broker whereby it may automatically
create and delete queues which are not explicitly defined through
the management API or file-based configuration when a client sends a
message to or receives from a queue via the AMQP protocol. Note,
the destination has to be named like "jms.queue.*" to be auto-
created. The queue may subsequently be deleted when it no longer has
any messages and consumers. Auto-creation and auto-deletion can both
be turned on/off via address-setting.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/2cbef28c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/2cbef28c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/2cbef28c

Branch: refs/heads/master
Commit: 2cbef28cc727dc002031d01c209d89725fba1147
Parents: 8796602
Author: jbertram <jbertram@redhat.com>
Authored: Tue Feb 10 15:18:44 2015 -0600
Committer: jbertram <jbertram@redhat.com>
Committed: Thu Feb 12 14:02:45 2015 -0600

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |  22 +++-
 .../tests/integration/proton/ProtonTest.java    | 106 +++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/2cbef28c/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 5fe4472..5e0a9de 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -150,8 +150,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    @Override
    public boolean queueQuery(String queueName) throws Exception
    {
+      boolean queryResult = false;
+
       QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
-      return queueQuery.isExists();
+
+      if (queueQuery.isExists())
+      {
+         queryResult = true;
+      }
+      else
+      {
+         if (queueQuery.isAutoCreateJmsQueues())
+         {
+            serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName),
null, false, true);
+            queryResult = true;
+         }
+         else
+         {
+            queryResult = false;
+         }
+      }
+
+      return queryResult;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/2cbef28c/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java
index 79cb3b5..3665c73 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/proton/ProtonTest.java
@@ -40,6 +40,8 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Random;
 
+import org.apache.activemq.api.core.management.ResourceNames;
+import org.apache.activemq.tests.util.RandomUtil;
 import org.apache.qpid.amqp_1_0.client.Receiver;
 import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
@@ -886,6 +888,110 @@ public class ProtonTest extends ServiceTestBase
    }
 
 
+   @Test
+   public void testUsingPlainAMQPSenderWithNonExistantQueue() throws Exception
+   {
+      if (this.protocol != 0 && protocol != 3)
+      {
+         return;
+      }
+
+      String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
+
+      org.apache.qpid.amqp_1_0.client.Connection connection = null;
+
+      try
+      {
+         // Step 1. Create an amqp qpid 1.0 connection
+         connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null,
null);
+
+         // Step 2. Create a session
+         org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
+
+         // Step 3. Create a sender
+         Sender sender = session.createSender(queue);
+
+         assertNotNull(server.locateQueue(new SimpleString(queue)));
+
+         // Step 4. send a simple message
+         sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
+
+         // Step 5. create a moving receiver, this means the message will be removed from
the queue
+         Receiver rec = session.createMovingReceiver(queue);
+
+         // Step 6. set some credit so we can receive
+         rec.setCredit(UnsignedInteger.valueOf(1), false);
+
+         // Step 7. receive the simple message
+         org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
+         System.out.println("message = " + m.getPayload());
+
+         // Step 8. acknowledge the message
+         rec.acknowledge(m);
+      }
+      finally
+      {
+         if (connection != null)
+         {
+            // Step 9. close the connection
+            connection.close();
+         }
+      }
+   }
+
+
+   @Test
+   public void testUsingPlainAMQPReceiverWithNonExistantQueue() throws Exception
+   {
+      if (this.protocol != 0 && protocol != 3)
+      {
+         return;
+      }
+
+      String queue = ResourceNames.JMS_QUEUE + RandomUtil.randomString();
+
+      org.apache.qpid.amqp_1_0.client.Connection connection = null;
+
+      try
+      {
+         // Step 1. Create an amqp qpid 1.0 connection
+         connection = new org.apache.qpid.amqp_1_0.client.Connection("localhost", 5672, null,
null);
+
+         // Step 2. Create a session
+         org.apache.qpid.amqp_1_0.client.Session session = connection.createSession();
+
+         // Step 3. create a moving receiver, this means the message will be removed from
the queue
+         Receiver rec = session.createMovingReceiver(queue);
+
+         assertNotNull(server.locateQueue(new SimpleString(queue)));
+
+         // Step 4. Create a sender
+         Sender sender = session.createSender(queue);
+
+         // Step 5. send a simple message
+         sender.send(new org.apache.qpid.amqp_1_0.client.Message("I am an amqp message"));
+
+         // Step 6. set some credit so we can receive
+         rec.setCredit(UnsignedInteger.valueOf(1), false);
+
+         // Step 7. receive the simple message
+         org.apache.qpid.amqp_1_0.client.Message m = rec.receive(5000);
+         System.out.println("message = " + m.getPayload());
+
+         // Step 8. acknowledge the message
+         rec.acknowledge(m);
+      }
+      finally
+      {
+         if (connection != null)
+         {
+            // Step 9. close the connection
+            connection.close();
+         }
+      }
+   }
+
+
    private javax.jms.Queue createQueue(String address)
    {
       if (protocol == 0 || protocol == 3)


Mime
View raw message