activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5666
Date Wed, 18 Mar 2015 22:21:14 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 4228e3d3e -> 8e6a404d5


https://issues.apache.org/jira/browse/AMQ-5666

Create some tests that exercise creating temp destinations using sender
links with dynamic targets

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e6a404d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e6a404d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e6a404d

Branch: refs/heads/master
Commit: 8e6a404d5e15c3be2fcacfaa929b272d692012d7
Parents: 4228e3d
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Mar 18 18:12:02 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Mar 18 18:12:02 2015 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpSupport.java    |   3 +
 .../transport/amqp/client/AmqpReceiver.java     |   5 +-
 .../transport/amqp/client/AmqpSender.java       |  36 ++++-
 .../transport/amqp/client/AmqpSession.java      |  37 ++++-
 .../amqp/interop/AmqpTempDestinationTest.java   | 137 +++++++++++++++++++
 5 files changed, 211 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
index 7af4c2c..526a043 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java
@@ -52,6 +52,9 @@ public class AmqpSupport {
     // Symbols used in configuration of newly opened links.
     public static final Symbol COPY = Symbol.getSymbol("copy");
 
+    // Lifetime policy symbols
+    public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+
     /**
      * Search for a given Symbol in a given array of Symbol object.
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 585ba93..978075c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -71,12 +71,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
     private final AmqpSession session;
     private final String address;
     private final String receiverId;
+    private final Source userSpecifiedSource;
 
     private String subscriptionName;
     private String selector;
     private boolean presettle;
     private boolean noLocal;
-    private Source userSpecifiedSource;
 
     /**
      * Create a new receiver instance.
@@ -94,6 +94,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
             throw new IllegalArgumentException("Address cannot be empty.");
         }
 
+        this.userSpecifiedSource = null;
         this.session = session;
         this.address = address;
         this.receiverId = receiverId;
@@ -454,7 +455,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
         Source source = userSpecifiedSource;
         Target target = new Target();
 
-        if (userSpecifiedSource == null && address != null) {
+        if (source == null && address != null) {
             source = new Source();
             source.setAddress(address);
             configureSource(source);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 95b0743..c8829e0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -64,6 +64,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
     private final AmqpSession session;
     private final String address;
     private final String senderId;
+    private final Target userSpecifiedTarget;
 
     private boolean presettle;
     private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@@ -82,9 +83,37 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
      *        The unique ID assigned to this sender.
      */
     public AmqpSender(AmqpSession session, String address, String senderId) {
+
+        if (address != null && address.isEmpty()) {
+            throw new IllegalArgumentException("Address cannot be empty.");
+        }
+
         this.session = session;
         this.address = address;
         this.senderId = senderId;
+        this.userSpecifiedTarget = null;
+    }
+
+    /**
+     * Create a new sender instance using the given Target when creating the link.
+     *
+     * @param session
+     *        The parent session that created the session.
+     * @param address
+     *        The address that this sender produces to.
+     * @param senderId
+     *        The unique ID assigned to this sender.
+     */
+    public AmqpSender(AmqpSession session, Target target, String senderId) {
+
+        if (target == null) {
+            throw new IllegalArgumentException("User specified Target cannot be null");
+        }
+
+        this.session = session;
+        this.userSpecifiedTarget = target;
+        this.address = target.getAddress();
+        this.senderId = senderId;
     }
 
     /**
@@ -216,8 +245,11 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         source.setAddress(senderId);
         source.setOutcomes(outcomes);
 
-        Target target = new Target();
-        target.setAddress(address);
+        Target target = userSpecifiedTarget;
+        if (target == null) {
+            target = new Target();
+            target.setAddress(address);
+        }
 
         String senderName = senderId + ":" + address;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 3b2a3d1..8af362b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
 import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Session;
 
@@ -82,6 +83,38 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     }
 
     /**
+     * Create a sender instance using the given Target
+     *
+     * @param target
+     *        the caller created and configured Traget used to create the sender link.
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpSender createSender(Target target) throws Exception {
+        checkClosed();
+
+        final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                sender.setStateInspector(getStateInspector());
+                sender.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return sender;
+    }
+
+    /**
      * Create a receiver instance using the given address
      *
      * @param address
@@ -153,10 +186,8 @@ public class AmqpSession extends AmqpAbstractResource<Session>
{
     }
 
     /**
-     * Create a receiver instance using the given address
+     * Create a receiver instance using the given Source
      *
-     * @param address
-     *        the address to which the receiver will subscribe for its messages.
      * @param source
      *        the caller created and configured Source used to create the receiver link.
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6a404d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
new file mode 100644
index 0000000..18ef7dd
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTempDestinationTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.junit.Test;
+
+/**
+ * Tests for JMS temporary destination mappings to AMQP
+ */
+public class AmqpTempDestinationTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 60000)
+    public void testCreateDynamicSenderToTopic() throws Exception {
+        doTestCreateDynamicSender(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateDynamicSenderToQueue() throws Exception {
+        doTestCreateDynamicSender(false);
+    }
+
+    protected void doTestCreateDynamicSender(boolean topic) throws Exception {
+        Target target = createDynamicTarget(topic);
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender(target);
+        assertNotNull(sender);
+
+        if (topic) {
+            assertEquals(1, brokerView.getTemporaryTopics().length);
+        } else {
+            assertEquals(1, brokerView.getTemporaryQueues().length);
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception {
+        doTestDynamicSenderLifetimeBoundToLinkQueue(true);
+    }
+
+    @Test(timeout = 60000)
+    public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception {
+        doTestDynamicSenderLifetimeBoundToLinkQueue(false);
+    }
+
+    protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception
{
+        Target target = createDynamicTarget(topic);
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender(target);
+        assertNotNull(sender);
+
+        if (topic) {
+            assertEquals(1, brokerView.getTemporaryTopics().length);
+        } else {
+            assertEquals(1, brokerView.getTemporaryQueues().length);
+        }
+
+        sender.close();
+
+        if (topic) {
+            assertEquals(0, brokerView.getTemporaryTopics().length);
+        } else {
+            assertEquals(0, brokerView.getTemporaryQueues().length);
+        }
+
+        connection.close();
+    }
+
+    protected Target createDynamicTarget(boolean topic) {
+
+        Target target = new Target();
+        target.setDynamic(true);
+        target.setDurable(TerminusDurability.NONE);
+        target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+        // Set the dynamic node lifetime-policy
+        Map<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
+        dynamicNodeProperties.put(DYNAMIC_NODE_LIFETIME_POLICY, DeleteOnClose.getInstance());
+        target.setDynamicNodeProperties(dynamicNodeProperties);
+
+        // Set the capability to indicate the node type being created
+        if (!topic) {
+            target.setCapabilities(TEMP_QUEUE_CAPABILITY);
+        } else {
+            target.setCapabilities(TEMP_TOPIC_CAPABILITY);
+        }
+
+        return target;
+    }
+}


Mime
View raw message